[ 
https://issues.apache.org/jira/browse/FLINK-6482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019409#comment-16019409
 ] 

ASF GitHub Bot commented on FLINK-6482:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117710523
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
 ---
    @@ -19,47 +19,65 @@
     package org.apache.flink.api.common.typeutils;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.java.tuple.Tuple2;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.util.Preconditions;
     
     import java.io.IOException;
    -import java.util.Arrays;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
      * A {@link TypeSerializerConfigSnapshot} for serializers that has 
multiple nested serializers.
    - * The configuration snapshot consists of the configuration snapshots of 
all nested serializers.
    + * The configuration snapshot consists of the configuration snapshots of 
all nested serializers, and
    + * also the nested serializers themselves.
    + *
    + * <p>Both the nested serializers and the configuration snapshots are 
written as configuration of
    + * composite serializers, so that on restore, the previous serializer may 
be used in case migration
    + * is required.
      */
     @Internal
     public abstract class CompositeTypeSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
     
    -   private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
    +   private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
nestedSerializersWithConfigs;
     
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public CompositeTypeSerializerConfigSnapshot() {}
     
    -   public 
CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... 
nestedSerializerConfigSnapshots) {
    -           this.nestedSerializerConfigSnapshots = 
Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
    +   public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... 
nestedSerializers) {
    +           Preconditions.checkNotNull(nestedSerializers);
    +
    +           this.nestedSerializersWithConfigs = new 
ArrayList<>(nestedSerializers.length);
    +           TypeSerializerConfigSnapshot configSnapshot;
    +           for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
    +                   configSnapshot = 
nestedSerializer.snapshotConfiguration();
    +                   this.nestedSerializersWithConfigs.add(
    +                           new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
    +                                   nestedSerializer.duplicate(),
    +                                   
Preconditions.checkNotNull(configSnapshot)));
    +           }
        }
     
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
    -           TypeSerializerUtil.writeSerializerConfigSnapshots(out, 
nestedSerializerConfigSnapshots);
    +           
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, 
nestedSerializersWithConfigs);
        }
     
        @Override
        public void read(DataInputView in) throws IOException {
                super.read(in);
    -           nestedSerializerConfigSnapshots = 
TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
    +           this.nestedSerializersWithConfigs =
    +                   
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
getUserCodeClassLoader());
        }
     
    -   public TypeSerializerConfigSnapshot[] 
getNestedSerializerConfigSnapshots() {
    -           return nestedSerializerConfigSnapshots;
    +   public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
getNestedSerializersAndConfigs() {
    +           return nestedSerializersWithConfigs;
        }
     
    -   public TypeSerializerConfigSnapshot 
getSingleNestedSerializerConfigSnapshot() {
    -           return nestedSerializerConfigSnapshots[0];
    +   public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
getSingleNestedSerializerAndConfig() {
    +           return nestedSerializersWithConfigs.get(0);
    --- End diff --
    
    Method name and variable name are not aligned.


> Add nested serializers into configuration snapshots of composite serializers
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-6482
>                 URL: https://issues.apache.org/jira/browse/FLINK-6482
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing, Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> Currently, the composite serializers' configuration snapshots only wrap the 
> config snapshots of nested serializers.
> We should also consider adding serialization of the nested serializers into 
> the config snapshot, so that in the case where only some nested serializer 
> cannot be loaded (class missing / implementation changed), we can also 
> provide a path for serializer upgrades.
> This applies for all composite serializers that have nested serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to