[ https://issues.apache.org/jira/browse/FLINK-22608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise closed FLINK-22608. ------------------------------- Resolution: Invalid Hi [~Autumn], Please check [Supported data types for schema evolution|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#supported-data-types-for-schema-evolution]. Kryo is not supported and probably never will. The main question is why Kryo is used for POJO in the first place: please check the [requirements|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#pojos] and your logs for warning. It usually makes sense to first ask on [user mailing list|https://flink.apache.org/gettinghelp.html#user-mailing-list] if it's indeed a bug. You usually get a response within 1 day and we can help to find workarounds. A jira ticket can always be created later. > Flink Kryo deserialize read wrong bytes > --------------------------------------- > > Key: FLINK-22608 > URL: https://issues.apache.org/jira/browse/FLINK-22608 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.12.0 > Reporter: Si Chen > Priority: Major > Labels: stale-major > > In flink program, I use ValueState to save my state. The state stores pojo. > But my pojo used kryo serializer. As the program run some time, I add a field > in pojo. Then recovery the program with checkpoint. I found the value of the > field incorrect. Then I read the source code I found > > {code:java} > //代码占位符 > org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData > private void readStateHandleStateData( > FSDataInputStream fsDataInputStream, > DataInputViewStreamWrapper inView, > KeyGroupRangeOffsets keyGroupOffsets, > Map<Integer, StateMetaInfoSnapshot> kvStatesById, > int numStates, > int readVersion, > boolean isCompressed) throws IOException { > final StreamCompressionDecorator streamCompressionDecorator = isCompressed > ? > SnappyStreamCompressionDecorator.INSTANCE : > UncompressedStreamCompressionDecorator.INSTANCE; > for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) { > int keyGroupIndex = groupOffset.f0; > long offset = groupOffset.f1; > // Check that restored key groups all belong to the backend. > Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The > key group must belong to the backend."); > fsDataInputStream.seek(offset); > int writtenKeyGroupIndex = inView.readInt(); > Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, > "Unexpected key-group in restore."); > try (InputStream kgCompressionInStream = > > streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) { > readKeyGroupStateData( > kgCompressionInStream, > kvStatesById, > keyGroupIndex, > numStates, > readVersion); > } > } > } > {code} > my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next > keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I > add new field in pojo. Kryo will read more bytes in the next keyGroup. > -- This message was sent by Atlassian Jira (v8.3.4#803005)