[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980978#comment-15980978
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:32 AM:
-

Yes, that is the idea. After implementing {{CheckpointedFunction}}, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to {{Checkpointed::restoreState()}} will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
{{instanceof}} checks are evaluated, iirc implementing both interfaces 
currently leads to a (intentional) runtime exception.


was (Author: srichter):
Yes, that is the idea. After implementing {{CheckpointedFunction}}, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to {{Checkpointed::restoreState()}} will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
{{instanceof}} checks are evaluated, iirc implementing both interfaces 
currently leads to a (purposeful) runtime exception.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980978#comment-15980978
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:30 AM:
-

Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
`instanceof` checks are evaluated, iirc implementing both interfaces currently 
leads to a (purposeful) runtime exception.


was (Author: srichter):
Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
instance of checks are evaluated, iirc implementing both interfaces currently 
leads to a (purposeful) runtime exception.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980978#comment-15980978
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:30 AM:
-

Yes, that is the idea. After implementing {{CheckpointedFunction}}, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to {{Checkpointed::restoreState()}} will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
{{instanceof}} checks are evaluated, iirc implementing both interfaces 
currently leads to a (purposeful) runtime exception.


was (Author: srichter):
Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
`instanceof` checks are evaluated, iirc implementing both interfaces currently 
leads to a (purposeful) runtime exception.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980978#comment-15980978
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:29 AM:
-

Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
instance of checks are evaluated, iirc implementing both interfaces currently 
leads to a (purposeful) runtime exception.


was (Author: srichter):
Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
instance of checks are evaluated, iirc implementing both interfaces is 
currently leads to a (purposeful) runtime exception.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980978#comment-15980978
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/24/17 10:29 AM:
-

Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience. I think all we need to change is just how some 
instance of checks are evaluated, iirc implementing both interfaces is 
currently leads to a (purposeful) runtime exception.


was (Author: srichter):
Yes, that is the idea. After implementing `CheckpointedFunction`, no more 
legacy state should be produced in future check/savepoints and therefore no 
more calls to `Checkpointed::restoreState()` will happen when restoring from 
the those new check/savepoints. The old interface can then be kept or dropped 
at the user's convenience.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-21 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979042#comment-15979042
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/21/17 4:42 PM:


Just a small idea: do you think that from a users perspective we should simply 
allow to implement both, {{Checkpointed}} and the new {{CheckpointedFunction}} 
with the result that {{CheckpointedFunction}} overrides {{Checkpointed}} for 
snapshots, essentially making it equivalent to {{CheckpointedRestoring}}. We 
can log that that {{Checkpointed}} is disabled on the snapshots. Then, starting 
from the next savepoint, the {{Checkpointed}} interface could be dropped at any 
time, or kept for backwards compatibility. This way we could avoid fiddling 
around with too many different interfaces.

Then the upgrade story is simply:

1. Implement {{CheckpointedFunction}}.
2. Drop {{Checkpointed}} at convenience, when backwards compatibility is 
obsolete.


was (Author: srichter):
Just a small idea: do you think that from a users perspective we should simply 
allow to implement both, {{Checkpointed}} and the new {{CheckpointedFunction}} 
with the result that {{CheckpointedFunction}} overrides {{Checkpointed}} for 
snapshots, essentially making it equivalent to {{CheckpointedRestoring}}. We 
can log that that {{Checkpointed}} is disabled on the snapshots. Then, starting 
from the next savepoint, the {{Checkpointed}} interface could be dropped at any 
time, or kept for backwards compatibility. This way we could avoid fiddling 
around with too many different interfaces.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6353) Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

2017-04-21 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979042#comment-15979042
 ] 

Stefan Richter edited comment on FLINK-6353 at 4/21/17 4:40 PM:


Just a small idea: do you think that from a users perspective we should simply 
allow to implement both, {{Checkpointed}} and the new {{CheckpointedFunction}} 
with the result that {{CheckpointedFunction}} overrides {{Checkpointed}} for 
snapshots, essentially making it equivalent to {{CheckpointedRestoring}}. We 
can log that that {{Checkpointed}} is disabled on the snapshots. Then, starting 
from the next savepoint, the {{Checkpointed}} interface could be dropped at any 
time, or kept for backwards compatibility. This way we could avoid fiddling 
around with too many different interfaces.


was (Author: srichter):
Just a small idea: do you think that from a users perspective we should simply 
allow to implement both, {{Checkpointed}} and the new {{CheckpointedFunction}} 
with the result that {{CheckpointedFunction}} overrides {{Checkpointed}} for 
snapshots, essentially making it equivalent to {{CheckpointedRestoring}}. We 
can log that that {{Checkpointed}} is disabled on the snapshots. Then, starting 
from the next savepoint, the {{Checkpointed}} interface could be dropped at any 
time, or kept for backwards compatibility. This way we could avoid fiddling 
around with to many different interfaces.

> Restoring using CheckpointedRestoring does not work from 1.2 to 1.2
> ---
>
> Key: FLINK-6353
> URL: https://issues.apache.org/jira/browse/FLINK-6353
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.2.1
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> State that was checkpointed using {{Checkpointed}} (on a user function) 
> cannot be restored using {{CheckpointedRestoring}} when the savepoint was 
> done on Flink 1.2. The reason is an overzealous check in 
> {{AbstractUdfStreamOperator}} that only restores from "legacy" operator state 
> using {{CheckpointedRestoring}} when the stream is a {{Migration}} stream.
> We can remove that check but still need to make sure to read away the byte 
> that indicates whether there is legacy state, which is written when we're 
> restoring from a Flink 1.1 savepoint.
> Also, if we remove the check, the procedure for a user to migrate a user 
> function away from the {{Checkpointed}} interface is this:
>  # Perform savepoint with user function still implementing {{Checkpointed}}, 
> shutdown job
>  # Change user function to implement {{CheckpointedRestoring}}
>  # Restore from previous savepoint, user function has to somehow move the 
> state that is restored using {{CheckpointedRestoring}} to another type of 
> state, .e.g operator state, using the {{OperatorStateStore}}.
>  # Perform another savepoint, shutdown job
>  # Remove {{CheckpointedRestoring}} interface from user function
>  # Restore from the second savepoint
>  # Done.
> If the {{CheckpointedRestoring}} interface is not removed as prescribed in 
> the last steps then a future restore of a new savepoint will fail because 
> Flink will try to read legacy operator state that is not there anymore.
> The above steps also apply to Flink 1.3, when a user want's to move away from 
> the {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)