[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815195#comment-17815195 ]
Zhanghao Chen commented on FLINK-33962: --------------------------------------- Hi [~srichter], I'm trying to make the hasher upgrade backwards-compatible by reviving the idea of FLINK-5290 authored by you back in Flink 1.2. The details can be found in the linked FLIP doc. Could you kindly help take a review? Thanks a lot in advance. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > ---------------------------------------------------------------------------------------------- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core > Reporter: Zhanghao Chen > Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)