[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627519#comment-16627519
 ] 

ASF GitHub Bot commented on FLINK-9377:
---------------------------------------

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220232529
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 ##########
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about 
whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * <p>Typically, the compatibility of the new serializer is resolved by 
checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending 
on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the 
previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer 
can be used.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility<T> {
+
+       /**
+        * Enum for the type of the compatibility.
+        */
+       enum Type {
+
+               /** This indicates that the new serializer continued to be used 
as is. */
+               COMPATIBLE_AS_IS,
+
+               /**
+                * This indicates that it is required to reconfigure the new 
serializer before
+                * it can be used. The reconfigured serializer should be 
provided as part of the
+                * resolved {@link TypeSerializerSchemaCompatibility} result.
+                */
+               COMPATIBLE_AFTER_RECONFIGURATION,
+
+               /**
+                * This indicates that it is possible to use the new serializer 
after performing a
+                * full-scan migration over all state, by reading bytes with 
the previous serializer
+                * and then writing it again with the new serializer, 
effectively converting the
+                * serialization schema to correspond to the new serializer.
+                */
+               COMPATIBLE_AFTER_MIGRATION,
+
+               /**
+                * This indicates that the new serializer is incompatible, even 
with migration.
+                * This normally implies that the deserialized Java class can 
not be commonly recognized
+                * by the previous and new serializer.
+                */
+               INCOMPATIBLE
+       }
+
+       /**
+        * The type of the compatibility.
+        */
+       private final Type resultType;
+
+       /**
+        * The reconfigured new serializer to use. This is only relevant
+        * in the case that the type of the compatibility is {@link 
Type#COMPATIBLE_AFTER_RECONFIGURATION}.
+        */
+       private final TypeSerializer<T> reconfiguredNewSerializer;
+
+       /**
+        * Returns a result that indicates that the new serializer is 
compatible and no migration is required.
+        * The new serializer can continued to be used as is.
+        *
+        * @return a result that indicates migration is not required for the 
new serializer.
+        */
+       public static <T> TypeSerializerSchemaCompatibility<T> compatibleAsIs() 
{
+               return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+       }
+
+       /**
+        * Returns a result that indicates that no migration is required, but 
the new serializer had to be
+        * reconfigured in order for it to be compatible. A reconfigured 
serializer is provided and
+        * should be used instead.
+        *
+        * @param reconfiguredSerializer the reconfigured new serializer that 
should be used.
+        *
+        * @return a result that indicates migration is not required, but a 
reconfigured version of the new
+        * serializer should be used.
+        */
+       public static <T> TypeSerializerSchemaCompatibility<T> 
compatibleAfterReconfiguration(TypeSerializer<T> reconfiguredSerializer) {
+               return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_RECONFIGURATION, 
reconfiguredSerializer);
+       }
+
+       /**
+        * Returns a result that indicates that the new serializer can be used 
after migrating the written bytes, i.e.
+        * reading it with the old serializer and then writing it again with 
the new serializer.
+        *
+        * @return a result that indicates that the new serializer can be used 
after migrating the written bytes.
+        */
+       public static <T> TypeSerializerSchemaCompatibility<T> 
compatibleAfterMigration() {
+               return new 
TypeSerializerSchemaCompatibility<T>(Type.COMPATIBLE_AFTER_MIGRATION, null);
+       }
+
+       /**
+        * Returns a result that indicates there is no possible way for the new 
serializer to be use-able.
+        * This normally indicates that there is no common Java class between 
what the previous bytes can be
+        * deserialized into and what can be written by the new serializer.
 
 Review comment:
   Shouldn't we explain what will be the effect of it? That the recovery will 
fail with an exception? Or do you plan to provide some other possible options?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9377
>                 URL: https://issues.apache.org/jira/browse/FLINK-9377
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to