This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 155f8c7278de175a77f757d98301178b57bc421c
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Thu Aug 8 07:34:04 2019 +0200

    [FLINK-13159] Fix incorrect subclass serializer reconfiguration in 
PojoSerializer
---
 .../typeutils/runtime/PojoSerializerSnapshot.java  | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
index 9987fae..95276d6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
@@ -474,13 +474,21 @@ public class PojoSerializerSnapshot<T> implements 
TypeSerializerSnapshot<T> {
                Iterator<TypeSerializer<?>> 
serializersForPreexistingRegistrations =
                        
Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator();
 
-               for (Map.Entry<Class<?>, TypeSerializer<?>> registration : 
newSubclassRegistrations.entrySet()) {
-                       // new registrations should simply be appended to the 
subclass serializer registry with their new serializers;
-                       // preexisting registrations should use the 
compatibility-checked serializer
-                       TypeSerializer<?> newRegistration = 
(reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey()))
-                               ? serializersForPreexistingRegistrations.next()
-                               : registration.getValue();
-                       
reconfiguredSubclassSerializerRegistry.put(registration.getKey(), 
newRegistration);
+               // first, replace all restored serializers of subclasses that 
co-exist in
+               // the previous and new registrations, with the 
compatibility-checked serializers
+               for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : 
reconfiguredSubclassSerializerRegistry.entrySet()) {
+                       if 
(newSubclassRegistrations.containsKey(oldRegistration.getKey())) {
+                               
oldRegistration.setValue(serializersForPreexistingRegistrations.next());
+                       }
+               }
+
+               // then, for all new registration that did not exist before, 
append it to the registry simply with their
+               // new serializers
+               for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : 
newSubclassRegistrations.entrySet()) {
+                       TypeSerializer<?> oldRegistration = 
reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey());
+                       if (oldRegistration == null) {
+                               
reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), 
newRegistration.getValue());
+                       }
                }
 
                return 
decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);

Reply via email to