RE: State Processor API and existing state

2021-07-08 Thread Tan, Min
Many thanks for your prompt reply.

Yes. They are the same operators.
What I did is just modifying the content of POJO .e.g., doubling amount fields.

I am not able to send the production code, but I will do a separate mocked 
project to reproduce the issue. Send you the mocked code later.

Regards,
Min

From: JING ZHANG 
Sent: 08 July 2021 04:45
To: Tan, Min 
Cc: Marco Villalobos ; user 
Subject: [External] Re: State Processor API and existing state

Hi min,
Is the POJO state in an existed operator or a new added operator?
BTW, that would be great if you would like to give the code to reproduce the 
exception. I need more debug to find out the reason based on the code.


Tan, Min mailto:min@ubs.com>> 于2021年7月8日周四 上午2:56写道:
Hi,

I have followed the steps below in restarting a Flink job with newly modified 
savepoints.

I can re start a job with new savepoints as long as the Flink states are 
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I 
have the following exceptions.

Any ideas? Do I need to redefine any serializers?

Thank you very much for your help in advance.
Regards,

Min


-Flink
 exceptions---
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.ja

Re: State Processor API and existing state

2021-07-07 Thread JING ZHANG
at org.apache.flink.api.common.typeutils.
> NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(
> NestedSerializersSnapshotDelegate.java:225)
>
> at org.apache.flink.api.common.typeutils.
> NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(
> NestedSerializersSnapshotDelegate.java:83)
>
> at org.apache.flink.api.common.typeutils.
> CompositeTypeSerializerSnapshot.restoreSerializer(
> CompositeTypeSerializerSnapshot.java:194)
>
> at org.apache.flink.runtime.state.StateSerializerProvider
> .previousSchemaSerializer(StateSerializerProvider.java:189)
>
> at org.apache.flink.runtime.state.StateSerializerProvider
> .currentSchemaSerializer(StateSerializerProvider.java:164)
>
> at org.apache.flink.runtime.state.
> RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(
> RegisteredKeyValueStateBackendMetaInfo.java:136)
>
> at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(
> StateTable.java:315)
>
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable
> .createStateMap(CopyOnWriteStateTable.java:54)
>
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable
> .createStateMap(CopyOnWriteStateTable.java:36)
>
> at org.apache.flink.runtime.state.heap.StateTable.(StateTable
> .java:98)
>
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.(
> CopyOnWriteStateTable.java:49)
>
> at org.apache.flink.runtime.state.heap.
> AsyncSnapshotStrategySynchronicityBehavior.newStateTable(
> AsyncSnapshotStrategySynchronicityBehavior.java:41)
>
> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy
> .newStateTable(HeapSnapshotStrategy.java:243)
>
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
>
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> HeapRestoreOperation.java:152)
>
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:114)
>
> ... 15 more
>
>
>
>
>
>
>
> *From:* JING ZHANG 
> *Sent:* 29 June 2021 07:45
> *To:* Marco Villalobos 
> *Cc:* user 
> *Subject:* [External] Re: State Processor API and existing state
>
>
>
> Hi Marco,
>
> > I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
> Yes
>
> > I have not figured out how to correct / augment / fix the state though.
> Can somebody please explain?
>
> Please try this way.
>
> 1. Load old savepoint file, create Savepoint obj1
>
> 2. Read state of operator with UID Y in returned Savepoint obj1 by step1
>
> 3. Create `BootstrapTransformation` based on entry point class
> `OperatorTransformation`, bootstrap new operator state with dataset
> returned by step2, correct or fix old state of operator UID Y in a
> `StateBotstrapFunction` or `KeyedStateBootstrapFunction`
>
> 4. Load old savepoint file, create Savepoint obj2
>
> 5. Drop the old operator with UID Y by calling `removeOperator` in
> returned Savepoint obj2 by step4
>
> 6. Add a new Operator with UID Y by calling `withOperator` in returned
> Savepoint obj2 by step4 , the first parameter is uid (Y), the second
> parameter is returned `BootstrapTranformation` by step 3.
>
> 7. writes out returned Savepoint obj2 by step7 to a new path
>
>
>
> In this way, in new savepoint files, states of operator withUIDs: W,X, Z
> are intact, only the state of operator Y is updated.
>
> Detailed about read/write/modify savepoint could be found in document[1]
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/
>
>
>
> Best regards,
>
> JING ZHANG
>
>
>
> Marco Villalobos  于2021年6月29日周二 上午6:00写道:
>
> Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses
> RocksDB as a backend with checkpoint data URI s3://checkpoints"
>
>
>
> Then I stop the job with a savepoint at s3://savepoint-1.
>
>
>
> I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
>
>
> Then, how can I fix the state in operator with UID Y, but keep all the
> data in the other operators intact?
>
>
>
> I know how to bootstrap state with the state-processor API.
>
> I have not figured out how to correct / augment / fix the state though.
>
>
>
> Can somebody please explain?
>
>


RE: State Processor API and existing state

2021-07-07 Thread Tan, Min
)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36)
at org.apache.flink.runtime.state.heap.StateTable.(StateTable.java:98)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.(CopyOnWriteStateTable.java:49)
at 
org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41)
at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more



From: JING ZHANG 
Sent: 29 June 2021 07:45
To: Marco Villalobos 
Cc: user 
Subject: [External] Re: State Processor API and existing state

Hi Marco,
> I assume that all the data within the checkpoint are stored within the given 
> Savepoint. Is that assumption correct?
Yes
> I have not figured out how to correct / augment / fix the state though. Can 
> somebody please explain?
Please try this way.
1. Load old savepoint file, create Savepoint obj1
2. Read state of operator with UID Y in returned Savepoint obj1 by step1
3. Create `BootstrapTransformation` based on entry point class 
`OperatorTransformation`, bootstrap new operator state with dataset returned by 
step2, correct or fix old state of operator UID Y in a  `StateBotstrapFunction` 
or `KeyedStateBootstrapFunction`
4. Load old savepoint file, create Savepoint obj2
5. Drop the old operator with UID Y by calling `removeOperator` in returned 
Savepoint obj2 by step4
6. Add a new Operator with UID Y by calling `withOperator` in returned 
Savepoint obj2 by step4 , the first parameter is uid (Y), the second parameter 
is returned `BootstrapTranformation` by step 3.
7. writes out returned Savepoint obj2 by step7 to a new path

In this way, in new savepoint files, states of operator withUIDs: W,X, Z are 
intact, only the state of operator Y is updated.
Detailed about read/write/modify savepoint could be found in document[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/

Best regards,
JING ZHANG

Marco Villalobos mailto:mvillalo...@kineteque.com>> 
于2021年6月29日周二 上午6:00写道:
Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses RocksDB 
as a backend with checkpoint data URI s3://checkpoints"

Then I stop the job with a savepoint at s3://savepoint-1.

I assume that all the data within the checkpoint are stored within the given 
Savepoint. Is that assumption correct?

Then, how can I fix the state in operator with UID Y, but keep all the data in 
the other operators intact?

I know how to bootstrap state with the state-processor API.
I have not figured out how to correct / augment / fix the state though.

Can somebody please explain?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: State Processor API and existing state

2021-06-28 Thread JING ZHANG
Hi Marco,
> I assume that all the data within the checkpoint are stored within the
given Savepoint. Is that assumption correct?
Yes
> I have not figured out how to correct / augment / fix the state though.
Can somebody please explain?
Please try this way.
1. Load old savepoint file, create Savepoint obj1
2. Read state of operator with UID Y in returned Savepoint obj1 by step1
3. Create `BootstrapTransformation` based on entry point class
`OperatorTransformation`, bootstrap new operator state with dataset
returned by step2, correct or fix old state of operator UID Y in a
`StateBotstrapFunction` or `KeyedStateBootstrapFunction`
4. Load old savepoint file, create Savepoint obj2
5. Drop the old operator with UID Y by calling `removeOperator` in returned
Savepoint obj2 by step4
6. Add a new Operator with UID Y by calling `withOperator` in returned
Savepoint obj2 by step4 , the first parameter is uid (Y), the second
parameter is returned `BootstrapTranformation` by step 3.
7. writes out returned Savepoint obj2 by step7 to a new path

In this way, in new savepoint files, states of operator withUIDs: W,X, Z
are intact, only the state of operator Y is updated.
Detailed about read/write/modify savepoint could be found in document[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/

Best regards,
JING ZHANG

Marco Villalobos  于2021年6月29日周二 上午6:00写道:

> Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses
> RocksDB as a backend with checkpoint data URI s3://checkpoints"
>
> Then I stop the job with a savepoint at s3://savepoint-1.
>
> I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
> Then, how can I fix the state in operator with UID Y, but keep all the
> data in the other operators intact?
>
> I know how to bootstrap state with the state-processor API.
> I have not figured out how to correct / augment / fix the state though.
>
> Can somebody please explain?
>