[ https://issues.apache.org/jira/browse/FLINK-21378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise closed FLINK-21378. ------------------------------- Resolution: Incomplete The idea is incomplete, see FLINK-21936. > Rescale pointwise connection during unaligned checkpoint recovery > ----------------------------------------------------------------- > > Key: FLINK-21378 > URL: https://issues.apache.org/jira/browse/FLINK-21378 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.13.0 > Reporter: Arvid Heise > Assignee: Arvid Heise > Priority: Major > > FLINK-19801 added support for rescaling of unaligned checkpoints through > virtual channels: A mapping of old to new channel infos helped to create a > virtual channel that demultiplexes buffers from different original channel > over the same physical channel. > The calculation of FLINK-19801, however, assumes that subpartition = channel > index, which holds for all fully connected exchanges, but not for point-wise > connection. For point-wise connections, there are few channels per subtask > and they correspond to one particular subpartition. > A possible approach is to actually use the subpartition information while > constructing {{InflightDataRescalingDescriptor}} in {{TaskStateAssignment}}. > Thus, instead of taking subtask index as the channel index, we should take > the subpartition as the channel index. The easiest way to implement it is, by > translating subtask index to subpartition index and then calculate the > channel index from it. > For that, the following changes are needed: > * {{StateAssignmentOperation}} attaches the (upstream/downstream) -> > subpartition mapping to all assignments of pointwise exchanges. The > information can be derived through {{ExecutionEdge}} -> > {{IntermediateResultPartition.partitionNumber}} (note that on execution graph > level subpartition is named partition). > * For non-pointwise exchanges, this mapping is the identity function. > * {{TaskStateAssignment}} uses this additional lookup to translate subtask > mapping to subpartition mappings, which can then be used to calculate the > channel indexes both on input and output side. -- This message was sent by Atlassian Jira (v8.3.4#803005)