[ 
https://issues.apache.org/jira/browse/FLINK-21378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise closed FLINK-21378.
-------------------------------
    Resolution: Incomplete

The idea is incomplete, see FLINK-21936.

> Rescale pointwise connection during unaligned checkpoint recovery
> -----------------------------------------------------------------
>
>                 Key: FLINK-21378
>                 URL: https://issues.apache.org/jira/browse/FLINK-21378
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>
> FLINK-19801 added support for rescaling of unaligned checkpoints through 
> virtual channels: A mapping of old to new channel infos helped to create a 
> virtual channel that demultiplexes buffers from different original channel 
> over the same physical channel.
> The calculation of FLINK-19801, however, assumes that subpartition = channel 
> index, which holds for all fully connected exchanges, but not for point-wise 
> connection. For point-wise connections, there are few channels per subtask 
> and they correspond to one particular subpartition.
> A possible approach is to actually use the subpartition information while 
> constructing {{InflightDataRescalingDescriptor}} in {{TaskStateAssignment}}. 
> Thus, instead of taking subtask index as the channel index, we should take 
> the subpartition as the channel index. The easiest way to implement it is, by 
> translating subtask index to subpartition index and then calculate the 
> channel index from it.
> For that, the following changes are needed:
> * {{StateAssignmentOperation}} attaches the (upstream/downstream) -> 
> subpartition mapping to all assignments of pointwise exchanges. The 
> information can be derived through {{ExecutionEdge}} -> 
> {{IntermediateResultPartition.partitionNumber}} (note that on execution graph 
> level subpartition is named partition).
> * For non-pointwise exchanges, this mapping is the identity function.
> * {{TaskStateAssignment}} uses this additional lookup to translate subtask 
> mapping to subpartition mappings, which can then be used to calculate the 
> channel indexes both on input and output side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to