[
https://issues.apache.org/jira/browse/FLINK-39248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan closed FLINK-39248.
-------------------------------------
Resolution: Fixed
Fixed in 360aa9709bbe25e2673e04e17a374d90480ce967.
> SinkUpsertMaterializerV2 deserialization can fail
> -------------------------------------------------
>
> Key: FLINK-39248
> URL: https://issues.apache.org/jira/browse/FLINK-39248
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.3.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.3.0
>
>
> Due to the use of input stream read instead of read fully, deserialization
> might fail with
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:360)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:120)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:884)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:836)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:836)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:787)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1112)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1072)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:887)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:683)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> SinkUpsertMaterializerV2_10f3bc77c24e457cd85e23da85a2136d_(1/1) from any of
> the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:165)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:489)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220)
> ... 12 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
> at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:511)
> at
> org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:566)
> at
> org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:478)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> ... 14 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1793)
> at
> java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2618)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2469)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2284)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1762)
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:540)
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:498)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:472)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:467)
> at
> org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.restore(RowDataKeySerializerSnapshot.java:97)
> at
> org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.readSnapshot(RowDataKeySerializerSnapshot.java:71)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
> at
> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:136)
> at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:170)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:162)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:123)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:62)
> at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:162)
> at
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:176)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:962)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:945)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreBaseDBFromLocalState(RocksDBIncrementalRestoreOperation.java:746)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(RocksDBIncrementalRestoreOperation.java:417)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:355)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:931)
> at
> org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
> at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:412)
> ... 19 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)