Hi,

I have followed the steps below in restarting a Flink job with newly modified 
savepoints.

I can re start a job with new savepoints as long as the Flink states are 
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I 
have the following exceptions.

Any ideas? Do I need to redefine any serializers?

Thank you very much for your help in advance.
Regards,

Min


-----------------------------------------------------------------------------Flink
 exceptions-------------------------------------------------------------------
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) 
from any of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
    at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136)
    at 
org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315)
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54)
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36)
    at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable.java:98)
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(CopyOnWriteStateTable.java:49)
    at 
org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41)
    at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243)
    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
    ... 15 more



From: JING ZHANG <beyond1...@gmail.com>
Sent: 29 June 2021 07:45
To: Marco Villalobos <mvillalo...@kineteque.com>
Cc: user <user@flink.apache.org>
Subject: [External] Re: State Processor API and existing state

Hi Marco,
> I assume that all the data within the checkpoint are stored within the given 
> Savepoint. Is that assumption correct?
Yes
> I have not figured out how to correct / augment / fix the state though. Can 
> somebody please explain?
Please try this way.
1. Load old savepoint file, create Savepoint obj1
2. Read state of operator with UID Y in returned Savepoint obj1 by step1
3. Create `BootstrapTransformation` based on entry point class 
`OperatorTransformation`, bootstrap new operator state with dataset returned by 
step2, correct or fix old state of operator UID Y in a  `StateBotstrapFunction` 
or `KeyedStateBootstrapFunction`
4. Load old savepoint file, create Savepoint obj2
5. Drop the old operator with UID Y by calling `removeOperator` in returned 
Savepoint obj2 by step4
6. Add a new Operator with UID Y by calling `withOperator` in returned 
Savepoint obj2 by step4 , the first parameter is uid (Y), the second parameter 
is returned `BootstrapTranformation` by step 3.
7. writes out returned Savepoint obj2 by step7 to a new path

In this way, in new savepoint files, states of operator withUIDs: W,X, Z are 
intact, only the state of operator Y is updated.
Detailed about read/write/modify savepoint could be found in document[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/

Best regards,
JING ZHANG

Marco Villalobos <mvillalo...@kineteque.com<mailto:mvillalo...@kineteque.com>> 
于2021年6月29日周二 上午6:00写道:
Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses RocksDB 
as a backend with checkpoint data URI s3://checkpoints"

Then I stop the job with a savepoint at s3://savepoint-1.

I assume that all the data within the checkpoint are stored within the given 
Savepoint. Is that assumption correct?

Then, how can I fix the state in operator with UID Y, but keep all the data in 
the other operators intact?

I know how to bootstrap state with the state-processor API.
I have not figured out how to correct / augment / fix the state though.

Can somebody please explain?
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Reply via email to