Hi Jiahui,

changing the job graph is what I meant with application upgrade. There is
no difference between checkpoint and savepoint afaik. New operators would
be initialized with empty state - so correct for stateless operators. So it
should work for all sketched cases with both savepoints and retained
checkpoints.

Just to be clear: quite a few SQL operators have state (e.g., joins,
windows, aggregation). So double-check if the operators are really
stateless (projection, selection). Also exactly-once sinks tend to retain
state (e.g. Kafka), so you might want to fix the UID for the sink as well.

On Wed, Jul 7, 2021 at 3:10 PM Jiahui Jiang <qzhzm173...@hotmail.com> wrote:

> Hello Arvid, how about no upgrade, just changing the job graph by having
> different stateless operators? Will checkpoint be sufficient?
>
> The specific example use case is - we have some infrastructure that
> orchestrates and runs user SQL queries. Sometimes in between runs users
> might have changed their queries and cause the job graph to be different.
> But for the stateful source function we always set it to a fixed UID. So we
> won’t reprocess the same input record twice even if the logic changes.
>
> If user explicitly stops the running application and changes their code,
> we can trigger a savepoint while that happens.
>
> If the application fails after failover strategy is satisfied (sometimes
> just through network blips), we can’t guarantee triggering a savepoint, and
> there will only be a retained checkpoint. Will a job graph that has the
> same statefull operators but different stateless operators be able to
> recover from this checkpoint?
>
> Thanks!
> ------------------------------
> *From:* Arvid Heise <ar...@apache.org>
> *Sent:* Wednesday, July 7, 2021 5:20 AM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* ro...@apache.org <ro...@apache.org>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Understanding recovering from savepoint / checkpoint with
> additional operators when chaining
>
> Hi Jiahui,
>
> Savepoint format is more universal and should be used when upgrading Flink
> versions. If you just upgrade the applications, there shouldn't be a big
> difference afaik.
>
> On Tue, Jul 6, 2021 at 9:41 PM Jiahui Jiang <qzhzm173...@hotmail.com>
> wrote:
>
> Hello Roman,
>
> Sorry I did some more testing and the original failure was caused by a
> different part of the pipeline. We I added a new stateless operator, it was
> able to restart from the previous savepoint with no issue.
>
> Another question I have is, since you explicitly asked whether it's a
> savepoint or checkpoint,
> What are the behavior differences when recovering from a checkpoint vs. a
> savepoint? If the job graph changes between runs, but all the stateful
> operators are guaranteed to have their UID fixed. Will a pipeline be able
> to restore from the retained checkpoint if incremental checkpoint is
> disabled?
>
> Thank you!
> ------------------------------
> *From:* Roman Khachatryan <ro...@apache.org>
> *Sent:* Friday, July 2, 2021 4:59 PM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Understanding recovering from savepoint / checkpoint with
> additional operators when chaining
>
> Hi,
>
> Just to clarify, you are recovering from a savepoint, not a retained
> checkpoint, right?
> And how are you setting the operator IDs?
> You mentioned that with allowNonRestoredState set to false recovery fails.
> Does it succeed with this flag set to true?
>
> Answering your questions:
> Each operator state has the corresponding ID in the snapshot (though
> technically the snapshot for the chain is sent as a single object to the
> JM).
> Probably some intermediate operators have state. How do you verify that
> they don't? Exception message could probably help to identify the
> problematic operators.
>
> Regards,
> Roman
>
>
> On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang <qzhzm173...@hotmail.com>
> wrote:
>
> Hello Flink,
>
> I'm trying to understand the state recovery mechanism when there are extra
> stateless operators.
>
> I'm using flink-sql, and I tested a 'select `number_col` from source'
> query, where the stream graph looks like:
>
> `source (stateful with fixed uid) -> [several stateless operators
> translated by Flink] -> sink (stateful with fixed uid)`
>
> I have enabled chaining, so these operators are all chaining into one task
> vertex.
>
>
> According to Flink's docs, I should be able to start a new job with
> different job graph as long as all the previous stateful operators can
> still be found in the graph.
> But when I tested recovery from the previous state with a new query
> 'select `1` from source'.
>
> The generated stream graph had one extra stateless operator, but failed to
> recover when there allowNonRestoredState was set to false.
>
> I'm wondering how's Flink storing operator state when chaining is enabled?
> Does it (1) store each operator state separately (source and sink has its
> own entry in the checkpoint state) or (2) store the state for all the
> operators chained into the same subtask (source, sink, all the SQL
> transformation operators) all under the same operator ID?
>
> In this experiement I have fixed the source and sink's uids, why does that
> seem to not have an effect on forcing the stateful operators to recover
> from its own state?
>
> Thank you!
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
> Savepoints | Apache Flink
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state>
> Savepoints # What is a Savepoint? How is a Savepoint different from a
> Checkpoint? # A Savepoint is a consistent image of the execution state of a
> streaming job, created via Flink’s checkpointing mechanism. You can use
> Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints
> consist of two parts: a directory with (typically large) binary files on
> stable storage (e.g. HDFS ...
> ci.apache.org
>
>

Reply via email to