Zhanghao Chen created FLINK-33962:
-------------------------------------

             Summary: Chaining-agnostic OperatorID generation for improved 
state compatibility on parallelism change
                 Key: FLINK-33962
                 URL: https://issues.apache.org/jira/browse/FLINK-33962
             Project: Flink
          Issue Type: Improvement
          Components: API / Core
            Reporter: Zhanghao Chen


*Background*

Flink restores opeartor state from snapshots based on matching the operatorIDs. 
Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when 
no user-set uid exist. The generated OperatorID is deterministic with respect 
to:
 * node-local properties (the traverse ID in the BFS for the DAG)
 * chained output nodes
 * input nodes hashes

*Problem*

The chaining behavior will affect state compatibility, as the generation of the 
OperatorID of an Op is dependent on its chained output nodes. For example, a 
simple source->sink DAG with source and sink chained together is state 
imcompatible with an otherwise identical DAG with source and sink unchained 
(either because the parallelisms of the two ops are changed to be unequal or 
chaining is disabled). This greatly limits the flexibility to perform 
chain-breaking/joining for performance tuning.

*Proposal*

Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
of operators, which effectively just removes L227-235 of 
[flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 at master · apache/flink 
(github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
 

This will not hurt the deteministicity of the ID generation across job 
submission as long as the stream graph topology doesn't change, and since new 
versions of Flink have already adopted pure operator-level state recovery, this 
will not break state recovery across job submission as long as both submissions 
use the same hasher.

This will, however, breaks cross-version state compatibility. So we can 
introduce a new option to enable using HasherV3 in v1.19 and consider making it 
the default hasher in v2.0.

Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to