David Morávek created BEAM-9824:
-----------------------------------

             Summary: Multiple reshuffles are ignored in some cases on Flink 
batch runner.
                 Key: BEAM-9824
                 URL: https://issues.apache.org/jira/browse/BEAM-9824
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 2.19.0, 2.20.0
            Reporter: David Morávek
            Assignee: David Morávek
             Fix For: 2.21.0


Multiple reshuffles are ignored in some cases on Flink batch runner. This may 
lead to huge performace penalty in IO connectors (when reshuffling splits).

In flink optimizer, when we `.rebalance()` dataset, is output channel is marked 
as `FORCED_REBALANCED`. When we chain this with another `.rebalance()`, the 
latter is ignored because it's source is already `FORCED_REBALANCED`, thus 
requested property is met. This is correct beaviour because rebalance is 
idempotent.

When we include `flatMap` in between rebalances -> 
`.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because 
dataset distribution may have changed (eg. you can possibli emit unbouded 
stream from a single element). Unfortunatelly `flatMap` output is still 
incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets ignored.

This especially affects IO connectors -> `FileIO.match()` returns reshuffled 
list of matched files -> we split each file into ranges -> **reshuffle** -> 
read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h 
in one of our production pipelines)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to