Hello Flinkers,

we have run into unexpected behaviour with chained Reshuffles in Apache
Beam's Flink runner (batch).

In flink optimizer, when we `.rebalance()` dataset, is output channel is
marked as `FORCED_REBALANCED`. When we chain this with another
`.rebalance()`, the latter is ignored because it's source is already
`FORCED_REBALANCED`, thus requested property is met. This is correct
beaviour because rebalance is idempotent.

When we include `flatMap` in between rebalances ->
`.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
because dataset distribution may have changed (eg. you can possibli emit
unbouded stream from a single element). Unfortunatelly `flatMap` output is
still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
gets ignored.

We have worked around this by replacing repartition with identity map
function with Optimizer.HINT_SHIP_STRATEGY_REPARTITION.

I have a feeling that this is just a workaround and should be fixed in
flink optimizer itself.

Relavant Beam jira - https://issues.apache.org/jira/browse/BEAM-9824

WDYT?

Thanks,

D.

Reply via email to