[ https://issues.apache.org/jira/browse/FLINK-21936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307388#comment-17307388 ]
Arvid Heise commented on FLINK-21936: ------------------------------------- A completely different approach would be possible with dynamic rescaling (epoch-based). We would drain the recovered data (with old parallelism) and then rewire from source to sink. However, that feels like Flink 3.0. > Disable checkpointing of inflight data in pointwise connections for unaligned > checkpoints > ----------------------------------------------------------------------------------------- > > Key: FLINK-21936 > URL: https://issues.apache.org/jira/browse/FLINK-21936 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.13.0 > Reporter: Arvid Heise > Assignee: Arvid Heise > Priority: Major > > We currently do not have any hard guarantees on pointwise connection > regarding data consistency. However, since data was structured implicitly in > the same way as any preceding source or keyby, some users relied on this > behavior to divide compute-intensive tasks into smaller chunks while relying > on ordering guarantees. > As long as the parallelism does not change, unaligned checkpoints (UC) > retains these properties. With the implementation of rescaling of UC > (FLINK-19801), that has changed. For most exchanges, there is a meaningful > way to reassign state from one channel to another (even in random order). For > some exchanges, the mapping is ambiguous and requires post-filtering. > However, for point-wise connections, it's impossible while retaining these > properties. > Consider, {{source -> keyby -> task1 -> forward -> task2}}. No if we want to > rescale from parallelism p = 1 to p = 2, suddenly the records inside the > keyby channels need to be divided into two channels according to the > keygroups. That is easily possible by using the keygroup ranges of the > operators and a way to determine the key(group) of the record (independent of > the actual approach). For the forward channel, we completely lack the key > context. No record in the forward channel has any keygroup assigned; it's > also not possible to calculate it as there is no guarantee that the key is > still present. > The root cause for this limitation is the conceptual mismatch between what we > provide and what some users assume we provide (or we assume that the users > assume). For example, it's impossible to use (keyed) state in task2 right > now, because there is no key context, but we still want to guarantee > orderness in respect to that key context. > For 1.13, the easiest solution is to disable channel state in pointwise > connections. For any non-trivial application with at least one shuffle, the > number of pointwise channels (linear to p) is quickly dwarfed by all-to-all > connections (quadratic to p). I'd add some alternative ideas to the > discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)