[ https://issues.apache.org/jira/browse/BEAM-9824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-9824: ------------------------------- Fix Version/s: (was: 2.21.0) 2.22.0 > Multiple reshuffles are ignored in some cases on Flink batch runner. > -------------------------------------------------------------------- > > Key: BEAM-9824 > URL: https://issues.apache.org/jira/browse/BEAM-9824 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 2.19.0, 2.20.0 > Reporter: David Morávek > Assignee: David Morávek > Priority: Major > Fix For: 2.22.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Multiple reshuffles are ignored in some cases on Flink batch runner. This may > lead to huge performace penalty in IO connectors (when reshuffling splits). > In flink optimizer, when we `.rebalance()` dataset, is output channel is > marked as `FORCED_REBALANCED`. When we chain this with another > `.rebalance()`, the latter is ignored because it's source is already > `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour > because rebalance is idempotent. > When we include `flatMap` in between rebalances -> > `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because > dataset distribution may have changed (eg. you can possibli emit unbouded > stream from a single element). Unfortunatelly `flatMap` output is still > incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets > ignored. > This especially affects IO connectors -> `FileIO.match()` returns reshuffled > list of matched files -> we split each file into ranges -> **reshuffle** -> > read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h > in one of our production pipelines) -- This message was sent by Atlassian Jira (v8.3.4#803005)