Zhanghao Chen created FLINK-33962: ------------------------------------- Summary: Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change Key: FLINK-33962 URL: https://issues.apache.org/jira/browse/FLINK-33962 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Zhanghao Chen
*Background* Flink restores opeartor state from snapshots based on matching the operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when no user-set uid exist. The generated OperatorID is deterministic with respect to: * node-local properties (the traverse ID in the BFS for the DAG) * chained output nodes * input nodes hashes *Problem* The chaining behavior will affect state compatibility, as the generation of the OperatorID of an Op is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state imcompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/joining for performance tuning. *Proposal* Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior of operators, which effectively just removes L227-235 of [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java at master · apache/flink (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. This will not hurt the deteministicity of the ID generation across job submission as long as the stream graph topology doesn't change, and since new versions of Flink have already adopted pure operator-level state recovery, this will not break state recovery across job submission as long as both submissions use the same hasher. This will, however, breaks cross-version state compatibility. So we can introduce a new option to enable using HasherV3 in v1.19 and consider making it the default hasher in v2.0. Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)