Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182776735 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer<T> toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- The problem is that we don't have the `ExecutionConfig` in the copy constructor.
---