http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java index b155005..953c4fa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java @@ -33,7 +33,6 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> { private static final Float ZERO = Float.valueOf(0); - @Override public boolean isImmutableType() { return true; @@ -83,4 +82,10 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> { public boolean canEqual(Object obj) { return obj instanceof FloatSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(FloatValueSerializer.class.getCanonicalName()); + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java index 3ccd88a..2542be9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java @@ -31,8 +31,7 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> { private static final long serialVersionUID = 1L; public static final FloatValueSerializer INSTANCE = new FloatValueSerializer(); - - + @Override public boolean isImmutableType() { return false; @@ -84,4 +83,10 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> { public boolean canEqual(Object obj) { return obj instanceof FloatValueSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(FloatSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index f35d71b..fe61ab3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.lang.reflect.Array; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -31,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A serializer for arrays of objects. * - * @param <C> The component type + * @param <C> The component type. */ @Internal public final class GenericArraySerializer<C> extends TypeSerializer<C[]> { @@ -186,4 +188,36 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> { public String toString() { return "Serializer " + componentClass.getName() + "[]"; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public GenericArraySerializerConfigSnapshot snapshotConfiguration() { + return new GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof GenericArraySerializerConfigSnapshot) { + final GenericArraySerializerConfigSnapshot config = (GenericArraySerializerConfigSnapshot) configSnapshot; + + if (componentClass.equals(config.getComponentClass())) { + CompatibilityResult<C> compatResult = componentSerializer.ensureCompatibility( + config.getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new GenericArraySerializer<>( + componentClass, + compatResult.getConvertDeserializer())); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java new file mode 100644 index 0000000..e78eb6c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Point-in-time configuration of a {@link GenericArraySerializer}. + * + * @param <C> The component type. + */ +@Internal +public final class GenericArraySerializerConfigSnapshot<C> extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + private Class<C> componentClass; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public GenericArraySerializerConfigSnapshot() {} + + public GenericArraySerializerConfigSnapshot( + Class<C> componentClass, + TypeSerializerConfigSnapshot componentSerializerConfigSnapshot) { + + super(componentSerializerConfigSnapshot); + + this.componentClass = Preconditions.checkNotNull(componentClass); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + InstantiationUtil.serializeObject(new DataOutputViewStream(out), componentClass); + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + try { + componentClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Could not find requested element class in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class<C> getComponentClass() { + return componentClass; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj) + && (obj instanceof GenericArraySerializerConfigSnapshot) + && (componentClass.equals(((GenericArraySerializerConfigSnapshot) obj).getComponentClass())); + } + + @Override + public int hashCode() { + return super.hashCode() * 31 + componentClass.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java index de6b326..e046e8e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java @@ -33,7 +33,6 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> { private static final Integer ZERO = Integer.valueOf(0); - @Override public boolean isImmutableType() { return true; @@ -83,4 +82,10 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> { public boolean canEqual(Object obj) { return obj instanceof IntSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(IntValueSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java index 1a90313..65f5e35 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java @@ -31,8 +31,7 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> private static final long serialVersionUID = 1L; public static final IntValueSerializer INSTANCE = new IntValueSerializer(); - - + @Override public boolean isImmutableType() { return false; @@ -84,4 +83,10 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> public boolean canEqual(Object obj) { return obj instanceof IntValueSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(IntSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java index 5d5e8f8..02d22de 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -19,7 +19,9 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -39,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of element in the list. */ @Internal -public class ListSerializer<T> extends TypeSerializer<List<T>> { +public final class ListSerializer<T> extends TypeSerializer<List<T>> { private static final long serialVersionUID = 1119562170939152304L; @@ -166,4 +168,30 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> { public int hashCode() { return elementSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public CollectionSerializerConfigSnapshot snapshotConfiguration() { + return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<List<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility( + ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new ListSerializer<>(compatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java index 086a2fc..cbdc3db 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java @@ -33,7 +33,6 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> { private static final Long ZERO = Long.valueOf(0); - @Override public boolean isImmutableType() { return true; @@ -83,4 +82,10 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> { public boolean canEqual(Object obj) { return obj instanceof LongSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(LongValueSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java index 5d94325..dcf3805 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java @@ -31,8 +31,7 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue private static final long serialVersionUID = 1L; public static final LongValueSerializer INSTANCE = new LongValueSerializer(); - - + @Override public boolean isImmutableType() { return false; @@ -84,4 +83,10 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue public boolean canEqual(Object obj) { return obj instanceof LongValueSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(LongSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java index 5e1a3bf..50900e4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java @@ -18,7 +18,10 @@ package org.apache.flink.api.common.typeutils.base; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; @@ -38,7 +41,8 @@ import java.util.HashMap; * @param <K> The type of the keys in the map. * @param <V> The type of the values in the map. */ -public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { +@Internal +public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { private static final long serialVersionUID = -6885593032367050078L; @@ -190,4 +194,37 @@ public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { public int hashCode() { return keySerializer.hashCode() * 31 + valueSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public MapSerializerConfigSnapshot snapshotConfiguration() { + return new MapSerializerConfigSnapshot( + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof MapSerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots = + ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]); + CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]); + + if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new MapSerializer<>( + keyCompatResult.getConvertDeserializer(), + valueCompatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java new file mode 100644 index 0000000..38e1254 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; + +/** + * Configuration snapshot for serializers of maps, containing the + * configuration snapshot of its key serializer and value serializer. + */ +@Internal +public final class MapSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public MapSerializerConfigSnapshot() {} + + public MapSerializerConfigSnapshot( + TypeSerializerConfigSnapshot keySerializerConfigSnapshot, + TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) { + + super(keySerializerConfigSnapshot, valueSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java index b94c0ac..322591d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java @@ -33,7 +33,6 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> { private static final Short ZERO = Short.valueOf((short)0); - @Override public boolean isImmutableType() { return true; @@ -83,4 +82,10 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> { public boolean canEqual(Object obj) { return obj instanceof ShortSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(ShortValueSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java index 51b234e..9e90927 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java @@ -31,8 +31,7 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal private static final long serialVersionUID = 1L; public static final ShortValueSerializer INSTANCE = new ShortValueSerializer(); - - + @Override public boolean isImmutableType() { return false; @@ -84,4 +83,10 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal public boolean canEqual(Object obj) { return obj instanceof ShortValueSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(ShortSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java index 9e1bbfd..6e30a5f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java @@ -102,4 +102,11 @@ public final class SqlDateSerializer extends TypeSerializerSingleton<Date> { public boolean canEqual(Object obj) { return obj instanceof SqlDateSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(DateSerializer.class.getCanonicalName()) + || identifier.equals(SqlTimeSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java index 544cf0f..3fa06c4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java @@ -101,4 +101,16 @@ public final class SqlTimeSerializer extends TypeSerializerSingleton<Time> { public boolean canEqual(Object obj) { return obj instanceof SqlTimeSerializer; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(DateSerializer.class.getCanonicalName()) + || identifier.equals(SqlDateSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java index 034a8cc..f2cd54f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java @@ -83,4 +83,10 @@ public final class StringSerializer extends TypeSerializerSingleton<String> { public boolean canEqual(Object obj) { return obj instanceof StringSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(StringValue.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java index 7f86b76..4e7ff97 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java @@ -33,8 +33,7 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV private static final int HIGH_BIT = 0x1 << 7; public static final StringValueSerializer INSTANCE = new StringValueSerializer(); - - + @Override public boolean isImmutableType() { return false; @@ -109,4 +108,10 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV public boolean canEqual(Object obj) { return obj instanceof StringValueSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(StringSerializer.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index c091072..c5decc5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -19,7 +19,10 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; @Internal public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{ @@ -48,4 +51,33 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{ return false; } } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + // type serializer singletons should always be parameter-less + return new ParameterlessTypeSerializerConfig(getSerializationFormatIdentifier()); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof ParameterlessTypeSerializerConfig + && isCompatibleSerializationFormatIdentifier( + ((ParameterlessTypeSerializerConfig) configSnapshot).getSerializationFormatIdentifier())) { + + return CompatibilityResult.compatible(); + } else { + return CompatibilityResult.requiresMigration(null); + } + } + + /** + * Subclasses can override this if they know that they are also compatible with identifiers of other formats. + */ + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + return identifier.equals(getSerializationFormatIdentifier()); + } + + private String getSerializationFormatIdentifier() { + return getClass().getCanonicalName(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java index 371200c..bf14f72 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet private static final boolean[] EMPTY = new boolean[0]; public static final BooleanPrimitiveArraySerializer INSTANCE = new BooleanPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java index 0f56149..dbad1df 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java @@ -36,7 +36,7 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton< private static final byte[] EMPTY = new byte[0]; public static final BytePrimitiveArraySerializer INSTANCE = new BytePrimitiveArraySerializer(); - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java index c3655e9..ae30109 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton< private static final char[] EMPTY = new char[0]; public static final CharPrimitiveArraySerializer INSTANCE = new CharPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java index 2b07cfd..610a22b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto private static final double[] EMPTY = new double[0]; public static final DoublePrimitiveArraySerializer INSTANCE = new DoublePrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java index 1fa4715..50b2b2e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton private static final float[] EMPTY = new float[0]; public static final FloatPrimitiveArraySerializer INSTANCE = new FloatPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java index a5a3e56..f11b46e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{ private static final int[] EMPTY = new int[0]; public static final IntPrimitiveArraySerializer INSTANCE = new IntPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java index 3ecae36..b0788c9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton< private static final long[] EMPTY = new long[0]; public static final LongPrimitiveArraySerializer INSTANCE = new LongPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java index 7f542c3..107fe03 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java @@ -36,8 +36,7 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton private static final short[] EMPTY = new short[0]; public static final ShortPrimitiveArraySerializer INSTANCE = new ShortPrimitiveArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java index ccf369a..61a654c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java @@ -38,8 +38,7 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[ private static final String[] EMPTY = new String[0]; public static final StringArraySerializer INSTANCE = new StringArraySerializer(); - - + @Override public boolean isImmutableType() { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index 8bcd157..c9eeb34 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -19,6 +19,9 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectDatumReader; @@ -26,13 +29,17 @@ import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.util.Utf8; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; import com.esotericsoftware.kryo.Kryo; +import org.apache.flink.util.Preconditions; import org.objenesis.strategy.StdInstantiatorStrategy; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,6 +58,14 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { private final Class<T> type; private final Class<? extends T> typeToInstantiate; + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + * <p>This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; private transient ReflectDatumWriter<T> writer; private transient ReflectDatumReader<T> reader; @@ -73,6 +88,8 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { this.typeToInstantiate = checkNotNull(typeToInstantiate); InstantiationUtil.checkForInstantiation(typeToInstantiate); + + this.kryoRegistrations = buildKryoRegistrations(type); } // -------------------------------------------------------------------------------------------- @@ -165,14 +182,9 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); kryo.setInstantiatorStrategy(instantiatorStrategy); - // register Avro types. - this.kryo.register(GenericData.Array.class, new Serializers.SpecificInstanceCollectionSerializerForArrayList()); - this.kryo.register(Utf8.class); - this.kryo.register(GenericData.EnumSymbol.class); - this.kryo.register(GenericData.Fixed.class); - this.kryo.register(GenericData.StringType.class); - this.kryo.setAsmEnabled(true); - this.kryo.register(type); + kryo.setAsmEnabled(true); + + KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); } } @@ -201,4 +213,120 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { public boolean canEqual(Object obj) { return obj instanceof AvroSerializer; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { + return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + + if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { + // resolve Kryo registrations; currently, since the Kryo registrations in Avro + // are fixed, there shouldn't be a problem with the resolution here. + + LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); + oldRegistrations.putAll(kryoRegistrations); + + for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { + if (reconfiguredRegistrationEntry.getValue().isDummy()) { + return CompatibilityResult.requiresMigration(null); + } + } + + this.kryoRegistrations = oldRegistrations; + return CompatibilityResult.compatible(); + } + } + + // ends up here if the preceding serializer is not + // the ValueSerializer, or serialized data type has changed + return CompatibilityResult.requiresMigration(null); + } + + public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + private Class<? extends T> typeToInstantiate; + + public AvroSerializerConfigSnapshot() {} + + public AvroSerializerConfigSnapshot( + Class<T> baseType, + Class<? extends T> typeToInstantiate, + LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + + super(baseType, kryoRegistrations); + this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + out.writeUTF(typeToInstantiate.getName()); + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + String classname = in.readUTF(); + try { + typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + classname + " in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class<? extends T> getTypeToInstantiate() { + return typeToInstantiate; + } + } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this Avro serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = buildKryoRegistrations(type); + } + } + + private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { + final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); + + // register Avro types. + registrations.put( + GenericData.Array.class.getName(), + new KryoRegistration( + GenericData.Array.class, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); + registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); + registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); + registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); + + // register the serialized data type + registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); + + return registrations; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java index 1f3fcbc..46b93c2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java @@ -21,7 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.CopyableValue; @@ -30,7 +33,7 @@ import org.apache.flink.util.InstantiationUtil; import static org.apache.flink.util.Preconditions.checkNotNull; @Internal -public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> { +public final class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; @@ -128,4 +131,41 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer public boolean canEqual(Object obj) { return obj instanceof CopyableValueSerializer; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public CopyableValueSerializerConfigSnapshot<T> snapshotConfiguration() { + return new CopyableValueSerializerConfigSnapshot<>(valueClass); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof CopyableValueSerializerConfigSnapshot + && valueClass.equals(((CopyableValueSerializerConfigSnapshot) configSnapshot).getTypeClass())) { + return CompatibilityResult.compatible(); + } else { + return CompatibilityResult.requiresMigration(null); + } + } + + public static final class CopyableValueSerializerConfigSnapshot<T extends CopyableValue<T>> + extends GenericTypeSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public CopyableValueSerializerConfigSnapshot() {} + + public CopyableValueSerializerConfigSnapshot(Class<T> copyableValueClass) { + super(copyableValueClass); + } + + @Override + public int getVersion() { + return VERSION; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java index d9018da..c025d61 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java @@ -19,7 +19,9 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Either; @@ -183,4 +185,39 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> { public int hashCode() { return 17 * leftSerializer.hashCode() + rightSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public EitherSerializerConfigSnapshot snapshotConfiguration() { + return new EitherSerializerConfigSnapshot( + leftSerializer.snapshotConfiguration(), + rightSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof EitherSerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] leftRightSerializerConfigSnapshots = + ((EitherSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + CompatibilityResult<L> leftCompatResult = leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]); + CompatibilityResult<R> rightCompatResult = rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]); + + if (!leftCompatResult.requiresMigration() && !rightCompatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else { + if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new EitherSerializer<>( + leftCompatResult.getConvertDeserializer(), + rightCompatResult.getConvertDeserializer())); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java new file mode 100644 index 0000000..473d438 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.types.Either; + +/** + * Configuration snapshot for serializers of the {@link Either} type, + * containing configuration snapshots of the Left and Right serializers. + */ +@Internal +public final class EitherSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public EitherSerializerConfigSnapshot() {} + + public EitherSerializerConfigSnapshot( + TypeSerializerConfigSnapshot leftSerializerConfigSnapshot, + TypeSerializerConfigSnapshot rightSerializerConfigSnapshot) { + + super(leftSerializerConfigSnapshot, rightSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java new file mode 100644 index 0000000..882073d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * A {@code KryoRegistration} resembles a registered class and its serializer in Kryo. + */ +@Internal +public class KryoRegistration implements Serializable { + + private static final long serialVersionUID = 5375110512910892655L; + + /** IMPORTANT: the order of the enumerations must not change, since their ordinals are used for serialization. */ + public enum SerializerDefinitionType { + UNSPECIFIED, CLASS, INSTANCE + } + + /** + * The registered class. + * + * <p>This can be a dummy class {@link KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass} if + * the class no longer exists when this registration instance was restored. + */ + private final Class<?> registeredClass; + + /** + * Class of the serializer to use for the registered class. + * Exists only if the serializer definition type is {@link SerializerDefinitionType#CLASS}. + * + * <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if + * the serializer class no longer exists when this registration instance was restored. + */ + private final Class<? extends Serializer<?>> serializerClass; + + /** + * A serializable instance of the serializer to use for the registered class. + * Exists only if the serializer definition type is {@link SerializerDefinitionType#INSTANCE}. + * + * <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if + * the serializer class no longer exists or is no longer valid when this registration instance was restored. + */ + private final ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance; + + private SerializerDefinitionType serializerDefinitionType; + + public KryoRegistration(Class<?> registeredClass) { + this.registeredClass = Preconditions.checkNotNull(registeredClass); + + this.serializerClass = null; + this.serializableSerializerInstance = null; + + this.serializerDefinitionType = SerializerDefinitionType.UNSPECIFIED; + } + + public KryoRegistration(Class<?> registeredClass, Class<? extends Serializer<?>> serializerClass) { + this.registeredClass = Preconditions.checkNotNull(registeredClass); + + this.serializerClass = Preconditions.checkNotNull(serializerClass); + this.serializableSerializerInstance = null; + + this.serializerDefinitionType = SerializerDefinitionType.CLASS; + } + + public KryoRegistration( + Class<?> registeredClass, + ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance) { + this.registeredClass = Preconditions.checkNotNull(registeredClass); + + this.serializerClass = null; + this.serializableSerializerInstance = Preconditions.checkNotNull(serializableSerializerInstance); + + this.serializerDefinitionType = SerializerDefinitionType.INSTANCE; + } + + public Class<?> getRegisteredClass() { + return registeredClass; + } + + public SerializerDefinitionType getSerializerDefinitionType() { + return serializerDefinitionType; + } + + public Class<? extends Serializer<?>> getSerializerClass() { + return serializerClass; + } + + public ExecutionConfig.SerializableSerializer<? extends Serializer<?>> getSerializableSerializerInstance() { + return serializableSerializerInstance; + } + + public Serializer<?> getSerializer(Kryo kryo) { + switch (serializerDefinitionType) { + case UNSPECIFIED: + return null; + case CLASS: + return ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, registeredClass); + case INSTANCE: + return serializableSerializerInstance.getSerializer(); + default: + // this should not happen; adding as a guard for the future + throw new IllegalStateException( + "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); + } + } + + public boolean isDummy() { + return registeredClass.equals(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class) + || (serializerClass != null + && serializerClass.equals(KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass.class)) + || (serializableSerializerInstance != null + && serializableSerializerInstance.getSerializer() instanceof KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (obj instanceof KryoRegistration) { + KryoRegistration other = (KryoRegistration) obj; + + // we cannot include the serializer instances here because they don't implement the equals method + return serializerDefinitionType == other.serializerDefinitionType + && registeredClass == other.registeredClass + && serializerClass == other.serializerClass; + } else { + return false; + } + } + + @Override + public int hashCode() { + int result = serializerDefinitionType.hashCode(); + result = 31 * result + registeredClass.hashCode(); + + if (serializerClass != null) { + result = 31 * result + serializerClass.hashCode(); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java new file mode 100644 index 0000000..3a42d69 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InvalidClassException; +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Configuration snapshot base class for serializers that use Kryo for serialization. + * + * <p>The configuration captures the order of Kryo serializer registrations, so that new + * Kryo serializers can determine how to reconfigure their registration order to retain + * backwards compatibility. + * + * @param <T> the data type that the Kryo serializer handles. + */ +@Internal +public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> { + + private static final Logger LOG = LoggerFactory.getLogger(KryoRegistrationSerializerConfigSnapshot.class); + + /** Map of class tag to the registration, with ordering. */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public KryoRegistrationSerializerConfigSnapshot() {} + + public KryoRegistrationSerializerConfigSnapshot( + Class<T> typeClass, + LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + + super(typeClass); + + this.kryoRegistrations = Preconditions.checkNotNull(kryoRegistrations); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + out.writeInt(kryoRegistrations.size()); + for (Map.Entry<String, KryoRegistration> kryoRegistrationEntry : kryoRegistrations.entrySet()) { + out.writeUTF(kryoRegistrationEntry.getKey()); + new KryoRegistrationSerializationProxy<>(kryoRegistrationEntry.getValue()).write(out); + } + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + int numKryoRegistrations = in.readInt(); + kryoRegistrations = new LinkedHashMap<>(numKryoRegistrations); + + KryoRegistrationSerializationProxy proxy; + for (int i = 0; i < numKryoRegistrations; i++) { + String classTag = in.readUTF(); + + proxy = new KryoRegistrationSerializationProxy(getUserCodeClassLoader()); + proxy.read(in); + + kryoRegistrations.put(classTag, proxy.kryoRegistration); + } + } + + public LinkedHashMap<String, KryoRegistration> getKryoRegistrations() { + return kryoRegistrations; + } + + // -------------------------------------------------------------------------------------------- + + private static class KryoRegistrationSerializationProxy<RC> implements IOReadableWritable { + + private ClassLoader userCodeClassLoader; + + private KryoRegistration kryoRegistration; + + public KryoRegistrationSerializationProxy(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + + public KryoRegistrationSerializationProxy(KryoRegistration kryoRegistration) { + this.kryoRegistration = Preconditions.checkNotNull(kryoRegistration); + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeUTF(kryoRegistration.getRegisteredClass().getName()); + + final KryoRegistration.SerializerDefinitionType serializerDefinitionType = kryoRegistration.getSerializerDefinitionType(); + + out.writeInt(serializerDefinitionType.ordinal()); + switch (serializerDefinitionType) { + case UNSPECIFIED: + // nothing else to write + break; + case CLASS: + out.writeUTF(kryoRegistration.getSerializerClass().getName()); + break; + case INSTANCE: + InstantiationUtil.serializeObject(new DataOutputViewStream(out), kryoRegistration.getSerializableSerializerInstance()); + break; + default: + // this should not happen; adding as a guard for the future + throw new IllegalStateException( + "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); + } + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + String registeredClassname = in.readUTF(); + + Class<RC> registeredClass; + try { + registeredClass = (Class<RC>) Class.forName(registeredClassname, true, userCodeClassLoader); + } catch (ClassNotFoundException e) { + LOG.warn("Cannot find registered class " + registeredClassname + " for Kryo serialization in classpath;" + + " using a dummy class as a placeholder.", e); + + registeredClass = (Class) DummyRegisteredClass.class; + } + + final KryoRegistration.SerializerDefinitionType serializerDefinitionType = + KryoRegistration.SerializerDefinitionType.values()[in.readInt()]; + + switch (serializerDefinitionType) { + case UNSPECIFIED: + kryoRegistration = new KryoRegistration(registeredClass); + break; + + case CLASS: + String serializerClassname = in.readUTF(); + + Class serializerClass; + try { + serializerClass = Class.forName(serializerClassname, true, userCodeClassLoader); + } catch (ClassNotFoundException e) { + LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname + + " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + + " a new Kryo serializer for the class is present", e); + + serializerClass = DummyKryoSerializerClass.class; + } + + kryoRegistration = new KryoRegistration(registeredClass, serializerClass); + break; + + case INSTANCE: + ExecutionConfig.SerializableSerializer<? extends Serializer<RC>> serializerInstance; + try { + serializerInstance = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader); + } catch (ClassNotFoundException e) { + LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname + + " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + + " a new Kryo serializer for the class is present", e); + + serializerInstance = new ExecutionConfig.SerializableSerializer<>(new DummyKryoSerializerClass<RC>()); + } catch (InvalidClassException e) { + LOG.warn("The registered Kryo serializer class for class " + registeredClassname + + " has changed and is no longer valid; using a dummy Kryo serializer that should be replaced" + + " as soon as a new Kryo serializer for the class is present.", e); + + serializerInstance = new ExecutionConfig.SerializableSerializer<>(new DummyKryoSerializerClass<RC>()); + } + + kryoRegistration = new KryoRegistration(registeredClass, serializerInstance); + break; + + default: + // this should not happen; adding as a guard for the future + throw new IllegalStateException( + "Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType); + } + } + } + + /** + * Placeholder dummy for a previously registered class that can no longer be found in classpath on restore. + */ + public static class DummyRegisteredClass {} + + /** + * Placeholder dummmy for a previously registered Kryo serializer that is no longer valid or in classpath on restore. + */ + public static class DummyKryoSerializerClass<RC> extends Serializer<RC> implements Serializable { + + private static final long serialVersionUID = -6172780797425739308L; + + @Override + public void write(Kryo kryo, Output output, Object o) { + throw new UnsupportedOperationException( + "This exception indicates that you're trying to write a data type" + + " that no longer has a valid Kryo serializer registered for it."); + } + + @Override + public Object read(Kryo kryo, Input input, Class aClass) { + throw new UnsupportedOperationException( + "This exception indicates that you're trying to read a data type" + + " that no longer has a valid Kryo serializer registered for it."); + } + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj) + && kryoRegistrations.equals(((KryoSerializer.KryoSerializerConfigSnapshot) obj).getKryoRegistrations()); + } + + @Override + public int hashCode() { + return super.hashCode() + kryoRegistrations.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java index 50c46e4..0937ac7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java @@ -20,11 +20,13 @@ package org.apache.flink.api.java.typeutils.runtime; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; +import java.util.Collection; /** * Convenience methods for Kryo @@ -86,4 +88,29 @@ public class KryoUtils { } } } + + /** + * Apply a list of {@link KryoRegistration} to a Kryo instance. The list of registrations is + * assumed to already be a final resolution of all possible registration overwrites. + * + * <p>The registrations are applied in the given order and always specify the registration id as + * the next available id in the Kryo instance (providing the id just extra ensures nothing is + * overwritten, and isn't strictly required); + * + * @param kryo the Kryo instance to apply the registrations + * @param resolvedRegistrations the registrations, which should already be resolved of all possible registration overwrites + */ + public static void applyRegistrations(Kryo kryo, Collection<KryoRegistration> resolvedRegistrations) { + + Serializer<?> serializer; + for (KryoRegistration registration : resolvedRegistrations) { + serializer = registration.getSerializer(kryo); + + if (serializer != null) { + kryo.register(registration.getRegisteredClass(), serializer, kryo.getNextRegistrationId()); + } else { + kryo.register(registration.getRegisteredClass(), kryo.getNextRegistrationId()); + } + } + } }