Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-07 Thread Arvid Heise
Hi Jiahui,

changing the job graph is what I meant with application upgrade. There is
no difference between checkpoint and savepoint afaik. New operators would
be initialized with empty state - so correct for stateless operators. So it
should work for all sketched cases with both savepoints and retained
checkpoints.

Just to be clear: quite a few SQL operators have state (e.g., joins,
windows, aggregation). So double-check if the operators are really
stateless (projection, selection). Also exactly-once sinks tend to retain
state (e.g. Kafka), so you might want to fix the UID for the sink as well.

On Wed, Jul 7, 2021 at 3:10 PM Jiahui Jiang  wrote:

> Hello Arvid, how about no upgrade, just changing the job graph by having
> different stateless operators? Will checkpoint be sufficient?
>
> The specific example use case is - we have some infrastructure that
> orchestrates and runs user SQL queries. Sometimes in between runs users
> might have changed their queries and cause the job graph to be different.
> But for the stateful source function we always set it to a fixed UID. So we
> won’t reprocess the same input record twice even if the logic changes.
>
> If user explicitly stops the running application and changes their code,
> we can trigger a savepoint while that happens.
>
> If the application fails after failover strategy is satisfied (sometimes
> just through network blips), we can’t guarantee triggering a savepoint, and
> there will only be a retained checkpoint. Will a job graph that has the
> same statefull operators but different stateless operators be able to
> recover from this checkpoint?
>
> Thanks!
> --
> *From:* Arvid Heise 
> *Sent:* Wednesday, July 7, 2021 5:20 AM
> *To:* Jiahui Jiang 
> *Cc:* ro...@apache.org ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Understanding recovering from savepoint / checkpoint with
> additional operators when chaining
>
> Hi Jiahui,
>
> Savepoint format is more universal and should be used when upgrading Flink
> versions. If you just upgrade the applications, there shouldn't be a big
> difference afaik.
>
> On Tue, Jul 6, 2021 at 9:41 PM Jiahui Jiang 
> wrote:
>
> Hello Roman,
>
> Sorry I did some more testing and the original failure was caused by a
> different part of the pipeline. We I added a new stateless operator, it was
> able to restart from the previous savepoint with no issue.
>
> Another question I have is, since you explicitly asked whether it's a
> savepoint or checkpoint,
> What are the behavior differences when recovering from a checkpoint vs. a
> savepoint? If the job graph changes between runs, but all the stateful
> operators are guaranteed to have their UID fixed. Will a pipeline be able
> to restore from the retained checkpoint if incremental checkpoint is
> disabled?
>
> Thank you!
> ----------
> *From:* Roman Khachatryan 
> *Sent:* Friday, July 2, 2021 4:59 PM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Understanding recovering from savepoint / checkpoint with
> additional operators when chaining
>
> Hi,
>
> Just to clarify, you are recovering from a savepoint, not a retained
> checkpoint, right?
> And how are you setting the operator IDs?
> You mentioned that with allowNonRestoredState set to false recovery fails.
> Does it succeed with this flag set to true?
>
> Answering your questions:
> Each operator state has the corresponding ID in the snapshot (though
> technically the snapshot for the chain is sent as a single object to the
> JM).
> Probably some intermediate operators have state. How do you verify that
> they don't? Exception message could probably help to identify the
> problematic operators.
>
> Regards,
> Roman
>
>
> On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang 
> wrote:
>
> Hello Flink,
>
> I'm trying to understand the state recovery mechanism when there are extra
> stateless operators.
>
> I'm using flink-sql, and I tested a 'select `number_col` from source'
> query, where the stream graph looks like:
>
> `source (stateful with fixed uid) -> [several stateless operators
> translated by Flink] -> sink (stateful with fixed uid)`
>
> I have enabled chaining, so these operators are all chaining into one task
> vertex.
>
>
> According to Flink's docs, I should be able to start a new job with
> different job graph as long as all the previous stateful operators can
> still be found in the graph.
> But when I tested recovery from the previous state with a new query
> 'select `1` from source'.
>
> The generated stream graph had one extra stateless ope

Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-07 Thread Jiahui Jiang
Hello Arvid, how about no upgrade, just changing the job graph by having 
different stateless operators? Will checkpoint be sufficient?

The specific example use case is - we have some infrastructure that 
orchestrates and runs user SQL queries. Sometimes in between runs users might 
have changed their queries and cause the job graph to be different. But for the 
stateful source function we always set it to a fixed UID. So we won’t reprocess 
the same input record twice even if the logic changes.

If user explicitly stops the running application and changes their code, we can 
trigger a savepoint while that happens.

If the application fails after failover strategy is satisfied (sometimes just 
through network blips), we can’t guarantee triggering a savepoint, and there 
will only be a retained checkpoint. Will a job graph that has the same 
statefull operators but different stateless operators be able to recover from 
this checkpoint?

Thanks!

From: Arvid Heise 
Sent: Wednesday, July 7, 2021 5:20 AM
To: Jiahui Jiang 
Cc: ro...@apache.org ; user@flink.apache.org 

Subject: Re: Understanding recovering from savepoint / checkpoint with 
additional operators when chaining

Hi Jiahui,

Savepoint format is more universal and should be used when upgrading Flink 
versions. If you just upgrade the applications, there shouldn't be a big 
difference afaik.

On Tue, Jul 6, 2021 at 9:41 PM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Roman,

Sorry I did some more testing and the original failure was caused by a 
different part of the pipeline. We I added a new stateless operator, it was 
able to restart from the previous savepoint with no issue.

Another question I have is, since you explicitly asked whether it's a savepoint 
or checkpoint,
What are the behavior differences when recovering from a checkpoint vs. a 
savepoint? If the job graph changes between runs, but all the stateful 
operators are guaranteed to have their UID fixed. Will a pipeline be able to 
restore from the retained checkpoint if incremental checkpoint is disabled?

Thank you!

From: Roman Khachatryan mailto:ro...@apache.org>>
Sent: Friday, July 2, 2021 4:59 PM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Understanding recovering from savepoint / checkpoint with 
additional operators when chaining

Hi,

Just to clarify, you are recovering from a savepoint, not a retained 
checkpoint, right?
And how are you setting the operator IDs?
You mentioned that with allowNonRestoredState set to false recovery fails. Does 
it succeed with this flag set to true?

Answering your questions:
Each operator state has the corresponding ID in the snapshot (though 
technically the snapshot for the chain is sent as a single object to the JM).
Probably some intermediate operators have state. How do you verify that they 
don't? Exception message could probably help to identify the problematic 
operators.

Regards,
Roman


On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Flink,

I'm trying to understand the state recovery mechanism when there are extra 
stateless operators.

I'm using flink-sql, and I tested a 'select `number_col` from source' query, 
where the stream graph looks like:

`source (stateful with fixed uid) -> [several stateless operators translated by 
Flink] -> sink (stateful with fixed uid)`

I have enabled chaining, so these operators are all chaining into one task 
vertex.


According to Flink's docs, I should be able to start a new job with different 
job graph as long as all the previous stateful operators can still be found in 
the graph.
But when I tested recovery from the previous state with a new query 'select `1` 
from source'.

The generated stream graph had one extra stateless operator, but failed to 
recover when there allowNonRestoredState was set to false.

I'm wondering how's Flink storing operator state when chaining is enabled?
Does it (1) store each operator state separately (source and sink has its own 
entry in the checkpoint state) or (2) store the state for all the operators 
chained into the same subtask (source, sink, all the SQL transformation 
operators) all under the same operator ID?

In this experiement I have fixed the source and sink's uids, why does that seem 
to not have an effect on forcing the stateful operators to recover from its own 
state?

Thank you!


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
Savepoints | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state>
Savepoints # What is a Savepoint? How is a Savepoint different from a 
Checkpoint? # A Savepoint i

Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-07 Thread Arvid Heise
Hi Jiahui,

Savepoint format is more universal and should be used when upgrading Flink
versions. If you just upgrade the applications, there shouldn't be a big
difference afaik.

On Tue, Jul 6, 2021 at 9:41 PM Jiahui Jiang  wrote:

> Hello Roman,
>
> Sorry I did some more testing and the original failure was caused by a
> different part of the pipeline. We I added a new stateless operator, it was
> able to restart from the previous savepoint with no issue.
>
> Another question I have is, since you explicitly asked whether it's a
> savepoint or checkpoint,
> What are the behavior differences when recovering from a checkpoint vs. a
> savepoint? If the job graph changes between runs, but all the stateful
> operators are guaranteed to have their UID fixed. Will a pipeline be able
> to restore from the retained checkpoint if incremental checkpoint is
> disabled?
>
> Thank you!
> --
> *From:* Roman Khachatryan 
> *Sent:* Friday, July 2, 2021 4:59 PM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Understanding recovering from savepoint / checkpoint with
> additional operators when chaining
>
> Hi,
>
> Just to clarify, you are recovering from a savepoint, not a retained
> checkpoint, right?
> And how are you setting the operator IDs?
> You mentioned that with allowNonRestoredState set to false recovery fails.
> Does it succeed with this flag set to true?
>
> Answering your questions:
> Each operator state has the corresponding ID in the snapshot (though
> technically the snapshot for the chain is sent as a single object to the
> JM).
> Probably some intermediate operators have state. How do you verify that
> they don't? Exception message could probably help to identify the
> problematic operators.
>
> Regards,
> Roman
>
>
> On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang 
> wrote:
>
> Hello Flink,
>
> I'm trying to understand the state recovery mechanism when there are extra
> stateless operators.
>
> I'm using flink-sql, and I tested a 'select `number_col` from source'
> query, where the stream graph looks like:
>
> `source (stateful with fixed uid) -> [several stateless operators
> translated by Flink] -> sink (stateful with fixed uid)`
>
> I have enabled chaining, so these operators are all chaining into one task
> vertex.
>
>
> According to Flink's docs, I should be able to start a new job with
> different job graph as long as all the previous stateful operators can
> still be found in the graph.
> But when I tested recovery from the previous state with a new query
> 'select `1` from source'.
>
> The generated stream graph had one extra stateless operator, but failed to
> recover when there allowNonRestoredState was set to false.
>
> I'm wondering how's Flink storing operator state when chaining is enabled?
> Does it (1) store each operator state separately (source and sink has its
> own entry in the checkpoint state) or (2) store the state for all the
> operators chained into the same subtask (source, sink, all the SQL
> transformation operators) all under the same operator ID?
>
> In this experiement I have fixed the source and sink's uids, why does that
> seem to not have an effect on forcing the stateful operators to recover
> from its own state?
>
> Thank you!
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
> Savepoints | Apache Flink
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state>
> Savepoints # What is a Savepoint? How is a Savepoint different from a
> Checkpoint? # A Savepoint is a consistent image of the execution state of a
> streaming job, created via Flink’s checkpointing mechanism. You can use
> Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints
> consist of two parts: a directory with (typically large) binary files on
> stable storage (e.g. HDFS ...
> ci.apache.org
>
>


Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-06 Thread Jiahui Jiang
Hello Roman,

Sorry I did some more testing and the original failure was caused by a 
different part of the pipeline. We I added a new stateless operator, it was 
able to restart from the previous savepoint with no issue.

Another question I have is, since you explicitly asked whether it's a savepoint 
or checkpoint,
What are the behavior differences when recovering from a checkpoint vs. a 
savepoint? If the job graph changes between runs, but all the stateful 
operators are guaranteed to have their UID fixed. Will a pipeline be able to 
restore from the retained checkpoint if incremental checkpoint is disabled?

Thank you!

From: Roman Khachatryan 
Sent: Friday, July 2, 2021 4:59 PM
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Understanding recovering from savepoint / checkpoint with 
additional operators when chaining

Hi,

Just to clarify, you are recovering from a savepoint, not a retained 
checkpoint, right?
And how are you setting the operator IDs?
You mentioned that with allowNonRestoredState set to false recovery fails. Does 
it succeed with this flag set to true?

Answering your questions:
Each operator state has the corresponding ID in the snapshot (though 
technically the snapshot for the chain is sent as a single object to the JM).
Probably some intermediate operators have state. How do you verify that they 
don't? Exception message could probably help to identify the problematic 
operators.

Regards,
Roman


On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Flink,

I'm trying to understand the state recovery mechanism when there are extra 
stateless operators.

I'm using flink-sql, and I tested a 'select `number_col` from source' query, 
where the stream graph looks like:

`source (stateful with fixed uid) -> [several stateless operators translated by 
Flink] -> sink (stateful with fixed uid)`

I have enabled chaining, so these operators are all chaining into one task 
vertex.


According to Flink's docs, I should be able to start a new job with different 
job graph as long as all the previous stateful operators can still be found in 
the graph.
But when I tested recovery from the previous state with a new query 'select `1` 
from source'.

The generated stream graph had one extra stateless operator, but failed to 
recover when there allowNonRestoredState was set to false.

I'm wondering how's Flink storing operator state when chaining is enabled?
Does it (1) store each operator state separately (source and sink has its own 
entry in the checkpoint state) or (2) store the state for all the operators 
chained into the same subtask (source, sink, all the SQL transformation 
operators) all under the same operator ID?

In this experiement I have fixed the source and sink's uids, why does that seem 
to not have an effect on forcing the stateful operators to recover from its own 
state?

Thank you!


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
Savepoints | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state>
Savepoints # What is a Savepoint? How is a Savepoint different from a 
Checkpoint? # A Savepoint is a consistent image of the execution state of a 
streaming job, created via Flink’s checkpointing mechanism. You can use 
Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints 
consist of two parts: a directory with (typically large) binary files on stable 
storage (e.g. HDFS ...
ci.apache.org<http://ci.apache.org>



Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-02 Thread Roman Khachatryan
Hi,

Just to clarify, you are recovering from a savepoint, not a retained
checkpoint, right?
And how are you setting the operator IDs?
You mentioned that with allowNonRestoredState set to false recovery fails.
Does it succeed with this flag set to true?

Answering your questions:
Each operator state has the corresponding ID in the snapshot (though
technically the snapshot for the chain is sent as a single object to the
JM).
Probably some intermediate operators have state. How do you verify that
they don't? Exception message could probably help to identify the
problematic operators.

Regards,
Roman


On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang  wrote:

> Hello Flink,
>
> I'm trying to understand the state recovery mechanism when there are extra
> stateless operators.
>
> I'm using flink-sql, and I tested a 'select `number_col` from source'
> query, where the stream graph looks like:
>
> `source (stateful with fixed uid) -> [several stateless operators
> translated by Flink] -> sink (stateful with fixed uid)`
>
> I have enabled chaining, so these operators are all chaining into one task
> vertex.
>
>
> According to Flink's docs, I should be able to start a new job with
> different job graph as long as all the previous stateful operators can
> still be found in the graph.
> But when I tested recovery from the previous state with a new query
> 'select `1` from source'.
>
> The generated stream graph had one extra stateless operator, but failed to
> recover when there allowNonRestoredState was set to false.
>
> I'm wondering how's Flink storing operator state when chaining is enabled?
> Does it (1) store each operator state separately (source and sink has its
> own entry in the checkpoint state) or (2) store the state for all the
> operators chained into the same subtask (source, sink, all the SQL
> transformation operators) all under the same operator ID?
>
> In this experiement I have fixed the source and sink's uids, why does that
> seem to not have an effect on forcing the stateful operators to recover
> from its own state?
>
> Thank you!
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
> Savepoints | Apache Flink
> 
> Savepoints # What is a Savepoint? How is a Savepoint different from a
> Checkpoint? # A Savepoint is a consistent image of the execution state of a
> streaming job, created via Flink’s checkpointing mechanism. You can use
> Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints
> consist of two parts: a directory with (typically large) binary files on
> stable storage (e.g. HDFS ...
> ci.apache.org
>
>


Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-02 Thread Jiahui Jiang
Hello Flink,

I'm trying to understand the state recovery mechanism when there are extra 
stateless operators.

I'm using flink-sql, and I tested a 'select `number_col` from source' query, 
where the stream graph looks like:

`source (stateful with fixed uid) -> [several stateless operators translated by 
Flink] -> sink (stateful with fixed uid)`

I have enabled chaining, so these operators are all chaining into one task 
vertex.


According to Flink's docs, I should be able to start a new job with different 
job graph as long as all the previous stateful operators can still be found in 
the graph.
But when I tested recovery from the previous state with a new query 'select `1` 
from source'.

The generated stream graph had one extra stateless operator, but failed to 
recover when there allowNonRestoredState was set to false.

I'm wondering how's Flink storing operator state when chaining is enabled?
Does it (1) store each operator state separately (source and sink has its own 
entry in the checkpoint state) or (2) store the state for all the operators 
chained into the same subtask (source, sink, all the SQL transformation 
operators) all under the same operator ID?

In this experiement I have fixed the source and sink's uids, why does that seem 
to not have an effect on forcing the stateful operators to recover from its own 
state?

Thank you!


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
Savepoints | Apache 
Flink
Savepoints # What is a Savepoint? How is a Savepoint different from a 
Checkpoint? # A Savepoint is a consistent image of the execution state of a 
streaming job, created via Flink’s checkpointing mechanism. You can use 
Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints 
consist of two parts: a directory with (typically large) binary files on stable 
storage (e.g. HDFS ...
ci.apache.org