Hello Flink,

I'm trying to understand the state recovery mechanism when there are extra 
stateless operators.

I'm using flink-sql, and I tested a 'select `number_col` from source' query, 
where the stream graph looks like:

`source (stateful with fixed uid) -> [several stateless operators translated by 
Flink] -> sink (stateful with fixed uid)`

I have enabled chaining, so these operators are all chaining into one task 
vertex.


According to Flink's docs, I should be able to start a new job with different 
job graph as long as all the previous stateful operators can still be found in 
the graph.
But when I tested recovery from the previous state with a new query 'select `1` 
from source'.

The generated stream graph had one extra stateless operator, but failed to 
recover when there allowNonRestoredState was set to false.

I'm wondering how's Flink storing operator state when chaining is enabled?
Does it (1) store each operator state separately (source and sink has its own 
entry in the checkpoint state) or (2) store the state for all the operators 
chained into the same subtask (source, sink, all the SQL transformation 
operators) all under the same operator ID?

In this experiement I have fixed the source and sink's uids, why does that seem 
to not have an effect on forcing the stateful operators to recover from its own 
state?

Thank you!


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
Savepoints | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state>
Savepoints # What is a Savepoint? How is a Savepoint different from a 
Checkpoint? # A Savepoint is a consistent image of the execution state of a 
streaming job, created via Flinkā€™s checkpointing mechanism. You can use 
Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints 
consist of two parts: a directory with (typically large) binary files on stable 
storage (e.g. HDFS ...
ci.apache.org

Reply via email to