[ https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15980978#comment-15980978 ]
Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:30 AM: ----------------------------------------------------------------- Yes, that is the idea. After implementing `CheckpointedFunction`, no more legacy state should be produced in future check/savepoints and therefore no more calls to `Checkpointed::restoreState()` will happen when restoring from the those new check/savepoints. The old interface can then be kept or dropped at the user's convenience. I think all we need to change is just how some `instanceof` checks are evaluated, iirc implementing both interfaces currently leads to a (purposeful) runtime exception. was (Author: srichter): Yes, that is the idea. After implementing `CheckpointedFunction`, no more legacy state should be produced in future check/savepoints and therefore no more calls to `Checkpointed::restoreState()` will happen when restoring from the those new check/savepoints. The old interface can then be kept or dropped at the user's convenience. I think all we need to change is just how some instance of checks are evaluated, iirc implementing both interfaces currently leads to a (purposeful) runtime exception. > Restoring using CheckpointedRestoring does not work from 1.2 to 1.2 > ------------------------------------------------------------------- > > Key: FLINK-6353 > URL: https://issues.apache.org/jira/browse/FLINK-6353 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.2.0, 1.2.1 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > State that was checkpointed using {{Checkpointed}} (on a user function) > cannot be restored using {{CheckpointedRestoring}} when the savepoint was > done on Flink 1.2. The reason is an overzealous check in > {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state > using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream. > We can remove that check but still need to make sure to read away the byte > that indicates whether there is legacy state, which is written when we're > restoring from a Flink 1.1 savepoint. > Also, if we remove the check, the procedure for a user to migrate a user > function away from the {{Checkpointed}} interface is this: > # Perform savepoint with user function still implementing {{Checkpointed}}, > shutdown job > # Change user function to implement {{CheckpointedRestoring}} > # Restore from previous savepoint, user function has to somehow move the > state that is restored using {{CheckpointedRestoring}} to another type of > state, .e.g operator state, using the {{OperatorStateStore}}. > # Perform another savepoint, shutdown job > # Remove {{CheckpointedRestoring}} interface from user function > # Restore from the second savepoint > # Done. > If the {{CheckpointedRestoring}} interface is not removed as prescribed in > the last steps then a future restore of a new savepoint will fail because > Flink will try to read legacy operator state that is not there anymore. > The above steps also apply to Flink 1.3, when a user want's to move away from > the {{Checkpointed}} interface. -- This message was sent by Atlassian JIRA (v6.3.15#6346)