[ 
https://issues.apache.org/jira/browse/BEAM-9824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-9824:
-------------------------------
    Fix Version/s:     (was: 2.21.0)
                   2.22.0

> Multiple reshuffles are ignored in some cases on Flink batch runner.
> --------------------------------------------------------------------
>
>                 Key: BEAM-9824
>                 URL: https://issues.apache.org/jira/browse/BEAM-9824
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0, 2.20.0
>            Reporter: David Morávek
>            Assignee: David Morávek
>            Priority: Major
>             Fix For: 2.22.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Multiple reshuffles are ignored in some cases on Flink batch runner. This may 
> lead to huge performace penalty in IO connectors (when reshuffling splits).
> In flink optimizer, when we `.rebalance()` dataset, is output channel is 
> marked as `FORCED_REBALANCED`. When we chain this with another 
> `.rebalance()`, the latter is ignored because it's source is already 
> `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour 
> because rebalance is idempotent.
> When we include `flatMap` in between rebalances -> 
> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because 
> dataset distribution may have changed (eg. you can possibli emit unbouded 
> stream from a single element). Unfortunatelly `flatMap` output is still 
> incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets 
> ignored.
> This especially affects IO connectors -> `FileIO.match()` returns reshuffled 
> list of matched files -> we split each file into ranges -> **reshuffle** -> 
> read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h 
> in one of our production pipelines)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to