Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182750453 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer<T> toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think?
---