Zhanghao Chen created FLINK-33962:
-------------------------------------
Summary: Chaining-agnostic OperatorID generation for improved
state compatibility on parallelism change
Key: FLINK-33962
URL: https://issues.apache.org/jira/browse/FLINK-33962
Project: Flink
Issue Type: Improvement
Components: API / Core
Reporter: Zhanghao Chen
*Background*
Flink restores opeartor state from snapshots based on matching the operatorIDs.
Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when
no user-set uid exist. The generated OperatorID is deterministic with respect
to:
* node-local properties (the traverse ID in the BFS for the DAG)
* chained output nodes
* input nodes hashes
*Problem*
The chaining behavior will affect state compatibility, as the generation of the
OperatorID of an Op is dependent on its chained output nodes. For example, a
simple source->sink DAG with source and sink chained together is state
imcompatible with an otherwise identical DAG with source and sink unchained
(either because the parallelisms of the two ops are changed to be unequal or
chaining is disabled). This greatly limits the flexibility to perform
chain-breaking/joining for performance tuning.
*Proposal*
Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior
of operators, which effectively just removes L227-235 of
[flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
at master · apache/flink
(github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
This will not hurt the deteministicity of the ID generation across job
submission as long as the stream graph topology doesn't change, and since new
versions of Flink have already adopted pure operator-level state recovery, this
will not break state recovery across job submission as long as both submissions
use the same hasher.
This will, however, breaks cross-version state compatibility. So we can
introduce a new option to enable using HasherV3 in v1.19 and consider making it
the default hasher in v2.0.
Looking forward to suggestions on this.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)