Hello Flinkers, we have run into unexpected behaviour with chained Reshuffles in Apache Beam's Flink runner (batch).
In flink optimizer, when we `.rebalance()` dataset, is output channel is marked as `FORCED_REBALANCED`. When we chain this with another `.rebalance()`, the latter is ignored because it's source is already `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour because rebalance is idempotent. When we include `flatMap` in between rebalances -> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because dataset distribution may have changed (eg. you can possibli emit unbouded stream from a single element). Unfortunatelly `flatMap` output is still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets ignored. We have worked around this by replacing repartition with identity map function with Optimizer.HINT_SHIP_STRATEGY_REPARTITION. I have a feeling that this is just a workaround and should be fixed in flink optimizer itself. Relavant Beam jira - https://issues.apache.org/jira/browse/BEAM-9824 WDYT? Thanks, D.