[ 
https://issues.apache.org/jira/browse/FLINK-39248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-39248.
-------------------------------------
    Resolution: Fixed

Fixed in 360aa9709bbe25e2673e04e17a374d90480ce967.

> SinkUpsertMaterializerV2 deserialization can fail
> -------------------------------------------------
>
>                 Key: FLINK-39248
>                 URL: https://issues.apache.org/jira/browse/FLINK-39248
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.3.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> Due to the use of input stream read instead of read fully, deserialization 
> might fail with
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:360)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:299)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:120)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:884)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:836)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:836)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:787)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1112)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1072)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:887)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:683)
>       at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> SinkUpsertMaterializerV2_10f3bc77c24e457cd85e23da85a2136d_(1/1) from any of 
> the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:165)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:489)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220)
>       ... 12 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
>       at 
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:511)
>       at 
> org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:566)
>       at 
> org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:478)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       ... 14 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1793)
>       at 
> java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2618)
>       at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2469)
>       at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2284)
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1762)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:540)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:498)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:472)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:467)
>       at 
> org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.restore(RowDataKeySerializerSnapshot.java:97)
>       at 
> org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.readSnapshot(RowDataKeySerializerSnapshot.java:71)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
>       at 
> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:136)
>       at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:170)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:162)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:123)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:62)
>       at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:162)
>       at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:176)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:962)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:945)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreBaseDBFromLocalState(RocksDBIncrementalRestoreOperation.java:746)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(RocksDBIncrementalRestoreOperation.java:417)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:355)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:931)
>       at 
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
>       at 
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:412)
>       ... 19 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to