Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182750453
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig 
executionConfig){
         * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
         */
        protected KryoSerializer(KryoSerializer<T> toCopy) {
    -           defaultSerializers = toCopy.defaultSerializers;
    -           defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -           kryoRegistrations = toCopy.kryoRegistrations;
    +           this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
    +           this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +           this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
    +           this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +           // deep copy the serializer instances in defaultSerializers
    +           for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry :
    +                   toCopy.defaultSerializers.entrySet()) {
     
    -           type = toCopy.type;
    -           if(type == null){
    -                   throw new NullPointerException("Type class cannot be 
null.");
    +                   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
    +           }
    +
    +           // deep copy the serializer instances in kryoRegistrations
    +           for (Map.Entry<String, KryoRegistration> entry : 
toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
    See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.
    
    Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again with the execution config because that method would handle stateful 
serializer registrations properly.
    IMO, this seems like a cleaner solution. What do you think?


---

Reply via email to