[ 
https://issues.apache.org/jira/browse/FLINK-21936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307388#comment-17307388
 ] 

Arvid Heise commented on FLINK-21936:
-------------------------------------

A completely different approach would be possible with dynamic rescaling 
(epoch-based). We would drain the recovered data (with old parallelism) and 
then rewire from source to sink. However, that feels like Flink 3.0.

> Disable checkpointing of inflight data in pointwise connections for unaligned 
> checkpoints
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-21936
>                 URL: https://issues.apache.org/jira/browse/FLINK-21936
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>
> We currently do not have any hard guarantees on pointwise connection 
> regarding data consistency. However, since data was structured implicitly in 
> the same way as any preceding source or keyby, some users relied on this 
> behavior to divide compute-intensive tasks into smaller chunks while relying 
> on ordering guarantees.
> As long as the parallelism does not change, unaligned checkpoints (UC) 
> retains these properties. With the implementation of rescaling of UC 
> (FLINK-19801), that has changed. For most exchanges, there is a meaningful 
> way to reassign state from one channel to another (even in random order). For 
> some exchanges, the mapping is ambiguous and requires post-filtering. 
> However, for point-wise connections, it's impossible while retaining these 
> properties.
> Consider, {{source -> keyby -> task1 -> forward -> task2}}. No if we want to 
> rescale from parallelism p = 1 to p = 2, suddenly the records inside the 
> keyby channels need to be divided into two channels according to the 
> keygroups. That is easily possible by using the keygroup ranges of the 
> operators and a way to determine the key(group) of the record (independent of 
> the actual approach). For the forward channel, we completely lack the key 
> context. No record in the forward channel has any keygroup assigned; it's 
> also not possible to calculate it as there is no guarantee that the key is 
> still present.
> The root cause for this limitation is the conceptual mismatch between what we 
> provide and what some users assume we provide (or we assume that the users 
> assume). For example, it's impossible to use (keyed) state in task2 right 
> now, because there is no key context, but we still want to guarantee 
> orderness in respect to that key context.
> For 1.13, the easiest solution is to disable channel state in pointwise 
> connections. For any non-trivial application with at least one shuffle, the 
> number of pointwise channels (linear to p) is quickly dwarfed by all-to-all 
> connections (quadratic to p). I'd add some alternative ideas to the 
> discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to