dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220909700
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##########
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * <ul>
+ *   <li><strong>Capturing serializer parameters and schema:</strong> a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.</li>
  *
- * <p>The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   <li><strong>Compatibility checks for new serializers:</strong> when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.</li>
+ *
+ *   <li><strong>Factory for a read serializer when schema conversion is 
required:<strong> in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.</li>
+ * </ul>
+ *
+ * <h2>Serializer Configuration and Schema</h2>
+ *
+ * <p>Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * <ul>
  *   <li><strong>Parameter settings of the serializer:</strong> parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.</li>
  *
- *   <li><strong>Serialization schema of the serializer:</strong> the data 
format used by the serializer.</li>
+ *   <li><strong>Serialization schema of the serializer:</strong> the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.</li>
  * </ul>
  *
  * <p>NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param <T> The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot<T> extends 
VersionedIOReadableWritable {
 
        /** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
        private ClassLoader userCodeClassLoader;
 
+       /**
+        * The originating serializer of this configuration snapshot.
+        *
+        * TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+        * TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+        * TODO method which simply returns this serializer instance. The 
serializer is written
+        * TODO and read using Java serialization as part of reading / writing 
the config snapshot
+        */
+       private TypeSerializer<T> serializer;
+
+       /**
+        * Creates a serializer using this configuration, that is capable of 
reading data
+        * written by the serializer described by this configuration.
+        *
+        * @return the restored serializer.
+        */
+       public TypeSerializer<T> restoreSerializer() {
+               // TODO this implementation is only a placeholder; the 
intention is to have no default implementation
+               return serializer;
+       }
+
+       /**
+        * Set the originating serializer of this configuration snapshot.
+        *
+        * TODO this method is a temporary workaround to inject the serializer 
instance to
+        * TODO be returned by the restoreSerializer() method.
+        */
+       @Internal
+       public final void setSerializer(TypeSerializer<T> serializer) {
+               this.serializer = Preconditions.checkNotNull(serializer);
+       }
+
+       /**
+        * Checks whether a new serializer is compatible to read data written 
be the originating serializer of this
+        * config snapshot; i.e. whether or not a new serializer is compatible 
with the previous serializer.
+        *
+        * @param newSerializer the new serializer to check against for schema 
compatibility.
+        *
+        * @return the resolve schema compatibility result.
+        */
+       public TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<?> newSerializer) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<T> castedSerializer = ((TypeSerializer<T>) 
newSerializer);
+
+               if 
(castedSerializer.ensureCompatibility(this).isRequiresMigration()) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+               } else {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAfterReconfiguration(castedSerializer);
+               }
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               // bump the version; we use this to know that there is a 
serializer to read as part of the config
+               out.writeInt(getVersion() + 1);
 
 Review comment:
   Ok, so the path will change to `1(pre-1.6)` -> `2 (1+1) (this PR)` -> `3(3 + 
0) (final version)`, right? I am just worried that we will have to change the 
version very carefully, as we would have to increase version of each serializer 
by 2 for the final version.
   
   Also if the final version will be released separately to version in this PR, 
this should be very carefully explained to the end users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to