Hi,
I have followed the steps below in restarting a Flink job with newly modified
savepoints.
I can re start a job with new savepoints as long as the Flink states are
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I
have the following exceptions.
Any ideas? Do I need to redefine any serializers?
Thank you very much for your help in advance.
Regards,
Min
-----------------------------------------------------------------------------Flink
exceptions-------------------------------------------------------------------
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1)
from any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
at
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136)
at
org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36)
at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable.java:98)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(CopyOnWriteStateTable.java:49)
at
org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41)
at
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
From: JING ZHANG <[email protected]>
Sent: 29 June 2021 07:45
To: Marco Villalobos <[email protected]>
Cc: user <[email protected]>
Subject: [External] Re: State Processor API and existing state
Hi Marco,
> I assume that all the data within the checkpoint are stored within the given
> Savepoint. Is that assumption correct?
Yes
> I have not figured out how to correct / augment / fix the state though. Can
> somebody please explain?
Please try this way.
1. Load old savepoint file, create Savepoint obj1
2. Read state of operator with UID Y in returned Savepoint obj1 by step1
3. Create `BootstrapTransformation` based on entry point class
`OperatorTransformation`, bootstrap new operator state with dataset returned by
step2, correct or fix old state of operator UID Y in a `StateBotstrapFunction`
or `KeyedStateBootstrapFunction`
4. Load old savepoint file, create Savepoint obj2
5. Drop the old operator with UID Y by calling `removeOperator` in returned
Savepoint obj2 by step4
6. Add a new Operator with UID Y by calling `withOperator` in returned
Savepoint obj2 by step4 , the first parameter is uid (Y), the second parameter
is returned `BootstrapTranformation` by step 3.
7. writes out returned Savepoint obj2 by step7 to a new path
In this way, in new savepoint files, states of operator withUIDs: W,X, Z are
intact, only the state of operator Y is updated.
Detailed about read/write/modify savepoint could be found in document[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/
Best regards,
JING ZHANG
Marco Villalobos <[email protected]<mailto:[email protected]>>
于2021年6月29日周二 上午6:00写道:
Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses RocksDB
as a backend with checkpoint data URI s3://checkpoints"
Then I stop the job with a savepoint at s3://savepoint-1.
I assume that all the data within the checkpoint are stored within the given
Savepoint. Is that assumption correct?
Then, how can I fix the state in operator with UID Y, but keep all the data in
the other operators intact?
I know how to bootstrap state with the state-processor API.
I have not figured out how to correct / augment / fix the state though.
Can somebody please explain?
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential
manipulation of contents and/or sender's address, incorrect recipient
(misdirection), viruses etc. Based on previous e-mail correspondence with you
and/or an agreement reached with you, UBS considers itself authorized to
contact you via e-mail. UBS assumes no responsibility for any loss or damage
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in
particular the risk that the banking relationship and confidential information
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain
it, how we keep it secure and your data protection rights, please see our
Privacy Notice http://www.ubs.com/privacy-statement