Tzu-Li (Gordon) Tai created FLINK-11591:
-------------------------------------------

             Summary: Restoring LockableTypeSerializer's snapshot from 1.6 and 
below requires pre-processing before snapshot is valid to use
                 Key: FLINK-11591
                 URL: https://issues.apache.org/jira/browse/FLINK-11591
             Project: Flink
          Issue Type: Bug
          Components: CEP, Type Serialization System
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Igal Shilman


In 1.6 and below, the {{LockableTypeSerializer}} incorrectly returns directly 
the element serializer's snapshot instead of wrapping it within an independent 
snapshot class:
https://github.com/apache/flink/blob/release-1.6/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java#L188

This results in the fact that the the written state information for this would 
be {{(LockableTypeSerializer, SomeArbitrarySnapshot)}}.

The problem occurs when restoring this in Flink 1.7+, since compatibility 
checks are now performed by providing the new serializer to the snapshot, what 
would happen is:
{{SomeArbitrarySnapshot.resolveSchemaCompatibility(newLockableTypeSerializer)}},
 which would not work since the arbitrary snapshot does not recognize the 
{{LockableTypeSerializer}}.

To fix this, we essentially need to preprocess that arbitrary snapshot when 
restoring from <= 1.6 version snapshots.

A proposed fix would be to have the following interface:
{code}
public interface RequiresLegacySerializerSnapshotPreprocessing<T> {
    TypeSerializerSnapshot<T> 
preprocessLegacySerializerSnapshot(TypeSerializerSnapshot<T> legacySnapshot)
}
{code}

The {{LockableTypeSerializer}} would then implement this interface, and in the 
{{preprocessLegacySerializerSnapshot}} method, properly wrap that arbitrary 
element serializer snapshot into a {{LockableTypeSerializerSnapshot}}.

In general, this interface is useful to preprocess any problematic snapshot 
that was returned pre 1.7.

The point-in-time to check if a written serializer in <= 1.6 savepoints 
implements this interface and preprocesses the read snapshot would be:
https://github.com/apache/flink/blob/a567a1ef628eadad21e11864ec328481cd6d7898/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java#L218



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to