Felipe Silvestre Santos de Morais created FLINK-20890:
---------------------------------------------------------

             Summary: flink-state-processor-api: differents serializers being 
taken from Flink & State processor api
                 Key: FLINK-20890
                 URL: https://issues.apache.org/jira/browse/FLINK-20890
             Project: Flink
          Issue Type: Bug
          Components: API / State Processor
    Affects Versions: 1.12.0
            Reporter: Felipe Silvestre Santos de Morais
             Fix For: 1.12.0
         Attachments: flinkschemaevolution2.zip

When a savepoint is triggered for a regular Flink Job with a keyed function, 
the key is serialized with 
{noformat}
flink.api.common.typeutils.base.IntSerializer{noformat}
and the value serialized with
{noformat}
flink.api.scala.typeutils.ScalaCaseClassSerializerSnapshot{noformat}
 

When the savepoint is loaded with the state processor api, transformed, and 
rewritten to the disk, the serializer taken is different.

Key:

 
{noformat}
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot{noformat}
 

 

Now when the savepoint written by the states processor api is loaded, there is 
the exception:

 
{code:java}
Caused by: org.apache.flink.util.StateMigrationException: The new key 
serializer (org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) 
must be compatible with the previous key serializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30).Caused
 by: org.apache.flink.util.StateMigrationException: The new key serializer 
(org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) must be 
compatible with the previous key serializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30). at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:147)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more
{code}
 

State processor api should use the same serializer of Flink since the type is 
exactly the same.

 

 

I have attached a zip that contains the code to test it.

In the project zipped there are the source and rewritten savepoints.

 

Note:

I have tried to play with enableX/disableX serializer, but so far no success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to