[ 
https://issues.apache.org/jira/browse/FLINK-19632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223499#comment-17223499
 ] 

Yuan Mei commented on FLINK-19632:
----------------------------------

Here are some good discussions about motivations why to bring a new 
ResultPartitionType which I think worth putting here.
{quote}I wanted to ask why we need a special ResultPartitionType for the 
approximate local recovery? Shouldn't it be conceptually possible that we 
support the normal and approximative recovery behaviour with the same pipelined 
partitions?
{quote}
Conceptually speaking, yes, it is possible to unify normal and approximation 
pipelined partitions; Practically speaking, I am not 100% sure they can be. 
There are mainly two considerations leading me to introduce a different type.

*1) The first and most important reason is isolating changes to avoid affecting 
the normal execution path.*
Since I am not sure whether the two can be unified, so I go with the safe step 
first. This is also to identify differences between these two types (There 
might be more differences for downstream reconnection as well). There could be 
corner cases that I am not aware of until real implementation.

Even though conceptually yes, having *different implementation subclasses for 
different connection behavior* does seem reasonable. It simplifies the logic 
for different behavior. *So personally, I am leaning not to unify them*.

But certainly, if it turns out to be cleaner and simpler to unify the two 
types, I have no objection to doing so. But from safety and easier-developing 
purpose, starting with a different type seems to be a better choice.

*2) Differences between these two types;*
For upstream-reconnection, there are mainly two differences: *read* and 
*release* upon these two types.
 * In normal pipeline mode, for each subpartition, its view is created once, 
and released when downstream disconnects. View release will cause subpartition 
release, and eventually partition release.
 * In approximate mode, for each subpartition, a view can be created and 
released multiple times as long as one view is available at one instant for a 
subpartition.
 ** for reading: upon reconnection, the reader should clean-up partial record 
caused by downstream failure (This could be easily unified)
 ** for release: a partition is released only if the partition finishes 
consumption (all data read) or its producer failed. The partition should not be 
released when all its views are released because new views can be created. (a 
bit difficult based on the current setting, let's discuss in the lifecycle part 
later).

{quote}If we say that we can reconnect to every pipelined result partition 
(including dropping partially consumed results), then it can be the 
responsibility of the scheduler to make sure that producers are restarted as 
well in order to ensure exactly/at-least once processing guarantees. If not, 
then we would simply consume from where we have left off.
{quote}
This seems true for now, since we can achieve exactly/at least-once through 
RegionPipeline failover, and approximate through single task failover. But I am 
not sure in the future. Later, if we want to support single task failover with 
at least once/exactly once, where channel data may persist somewhere, I can not 
say for sure this is purely a scheduler decision. We may end up having high 
chances to introduce more connection types for single task failover to support 
at least once/exactly once.
{quote}As far as I understand the existing 
ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the 
result partition if the downstream consumer disconnects. I believe that this is 
not a strict contract of pipelined result partitions but more of an 
implementation artefact. Couldn't we solve the problem of disappearing 
pipelined result partitions by binding the lifecyle of a pipelined result 
partition to the lifecycle of a Task? We could say that a Task can only 
terminate once the pipelined result partition has been consumed. Moreover, a 
Task will clean up the result partition if it fails or gets canceled. That way, 
we have a clearly defined lifecycle and make sure that these results get 
cleaned up (iff the Task reaches a terminal state).
{quote}
I totally agree.

Right now, the life cycle of {{ResultPartitionType.PIPELINED(_BOUNDED)}} is 
“binding” to the consumer task, not very intuitive but reasonable. Because 
{{PIPELINED(_BOUNDED)}} is consumed only once and as long as the downstream 
restarts, the upstream is restarting correspondingly.

Is it reasonable to bind the partition to the producer? Yes, I think it is 
following the best intuition as long as we make the task terminate after its 
produced result partition is consumed. I think this can also simplify the logic 
that needs to be applied on partitions through task resources but cannot due to 
the task has already terminated.

 

> Introduce a new ResultPartitionType for Approximate Local Recovery
> ------------------------------------------------------------------
>
>                 Key: FLINK-19632
>                 URL: https://issues.apache.org/jira/browse/FLINK-19632
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Yuan Mei
>            Assignee: Yuan Mei
>            Priority: Major
>              Labels: pull-request-available
>
> # On downstream node failure, the upstream node needs to release a 
> sub-partition view while keeping the result partition
>  # When the downstream node reconnects, the subpartition view needs to be 
> recreated. If the previous subpartition view still exists, it needs to be 
> released.
> Clean up of the partial records should be handled in a separate Jira ticket 
> FLINK-19547
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to