This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 268da6a [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer 268da6a is described below commit 268da6a03323b65d6297e4d2288ead39aba7d388 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Thu Aug 8 07:34:04 2019 +0200 [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer --- .../typeutils/runtime/PojoSerializerSnapshot.java | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java index 9987fae..95276d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java @@ -474,13 +474,21 @@ public class PojoSerializerSnapshot<T> implements TypeSerializerSnapshot<T> { Iterator<TypeSerializer<?>> serializersForPreexistingRegistrations = Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator(); - for (Map.Entry<Class<?>, TypeSerializer<?>> registration : newSubclassRegistrations.entrySet()) { - // new registrations should simply be appended to the subclass serializer registry with their new serializers; - // preexisting registrations should use the compatibility-checked serializer - TypeSerializer<?> newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey())) - ? serializersForPreexistingRegistrations.next() - : registration.getValue(); - reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration); + // first, replace all restored serializers of subclasses that co-exist in + // the previous and new registrations, with the compatibility-checked serializers + for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) { + if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) { + oldRegistration.setValue(serializersForPreexistingRegistrations.next()); + } + } + + // then, for all new registration that did not exist before, append it to the registry simply with their + // new serializers + for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : newSubclassRegistrations.entrySet()) { + TypeSerializer<?> oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey()); + if (oldRegistration == null) { + reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue()); + } } return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);