[ 
https://issues.apache.org/jira/browse/FLINK-22608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise closed FLINK-22608.
-------------------------------
    Resolution: Invalid

Hi [~Autumn],

Please check [Supported data types for schema 
evolution|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#supported-data-types-for-schema-evolution].
 Kryo is not supported and probably never will. 

The main question is why Kryo is used for POJO in the first place: please check 
the 
[requirements|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#pojos]
 and your logs for warning.

It usually makes sense to first ask on [user mailing 
list|https://flink.apache.org/gettinghelp.html#user-mailing-list] if it's 
indeed a bug. You usually get a response within 1 day and we can help to find 
workarounds. A jira ticket can always be created later.

> Flink Kryo deserialize read wrong bytes
> ---------------------------------------
>
>                 Key: FLINK-22608
>                 URL: https://issues.apache.org/jira/browse/FLINK-22608
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.12.0
>            Reporter: Si Chen
>            Priority: Major
>              Labels: stale-major
>
> In flink program, I use ValueState to save my state. The state stores pojo. 
> But my pojo used kryo serializer. As the program run some time, I add a field 
> in pojo. Then recovery the program with checkpoint. I found the value of the 
> field incorrect. Then I read the source code I found
>  
> {code:java}
> //代码占位符
> org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData
> private void readStateHandleStateData(
>    FSDataInputStream fsDataInputStream,
>    DataInputViewStreamWrapper inView,
>    KeyGroupRangeOffsets keyGroupOffsets,
>    Map<Integer, StateMetaInfoSnapshot> kvStatesById,
>    int numStates,
>    int readVersion,
>    boolean isCompressed) throws IOException {
>    final StreamCompressionDecorator streamCompressionDecorator = isCompressed 
> ?
>       SnappyStreamCompressionDecorator.INSTANCE : 
> UncompressedStreamCompressionDecorator.INSTANCE;
>    for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
>       int keyGroupIndex = groupOffset.f0;
>       long offset = groupOffset.f1;
>       // Check that restored key groups all belong to the backend.
>       Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The 
> key group must belong to the backend.");
>       fsDataInputStream.seek(offset);
>       int writtenKeyGroupIndex = inView.readInt();
>       Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
>          "Unexpected key-group in restore.");
>       try (InputStream kgCompressionInStream =
>              
> streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
>          readKeyGroupStateData(
>             kgCompressionInStream,
>             kvStatesById,
>             keyGroupIndex,
>             numStates,
>             readVersion);
>       }
>    }
> }
> {code}
> my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next 
> keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I 
> add new field in pojo. Kryo will read more bytes in the next keyGroup.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to