[
https://issues.apache.org/jira/browse/FLINK-19632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223499#comment-17223499
]
Yuan Mei edited comment on FLINK-19632 at 10/30/20, 8:51 AM:
-------------------------------------------------------------
Here are some good discussions about motivations why to bring a new
ResultPartitionType which I think worth putting here.
quote from [~trohrmann]
{quote}I wanted to ask why we need a special ResultPartitionType for the
approximate local recovery? Shouldn't it be conceptually possible that we
support the normal and approximative recovery behaviour with the same pipelined
partitions?
{quote}
Conceptually speaking, yes, it is possible to unify normal and approximation
pipelined partitions; Practically speaking, I am not 100% sure they can be.
There are mainly two considerations leading me to introduce a different type.
*1) The first and most important reason is isolating changes to avoid affecting
the normal execution path.*
Since I am not sure whether the two can be unified, so I go with the safe step
first. This is also to identify differences between these two types (There
might be more differences for downstream reconnection as well). There could be
corner cases that I am not aware of until real implementation.
Even though conceptually yes, having *different implementation subclasses for
different connection behavior* does seem reasonable. It simplifies the logic
for different behavior. *So personally, I am leaning not to unify them*.
But certainly, if it turns out to be cleaner and simpler to unify the two
types, I have no objection to doing so. But from safety and easier-developing
purpose, starting with a different type seems to be a better choice.
*2) Differences between these two types;*
For upstream-reconnection, there are mainly two differences: *read* and
*release* upon these two types.
* In normal pipeline mode, for each subpartition, its view is created once,
and released when downstream disconnects. View release will cause subpartition
release, and eventually partition release.
* In approximate mode, for each subpartition, a view can be created and
released multiple times as long as one view is available at one instant for a
subpartition.
** for reading: upon reconnection, the reader should clean-up partial record
caused by downstream failure (This could be easily unified)
** for release: a partition is released only if the partition finishes
consumption (all data read) or its producer failed. The partition should not be
released when all its views are released because new views can be created. (a
bit difficult based on the current setting, let's discuss in the lifecycle part
later).
quote from [~trohrmann]
{quote}If we say that we can reconnect to every pipelined result partition
(including dropping partially consumed results), then it can be the
responsibility of the scheduler to make sure that producers are restarted as
well in order to ensure exactly/at-least once processing guarantees. If not,
then we would simply consume from where we have left off.
{quote}
This seems true for now, since we can achieve exactly/at least-once through
RegionPipeline failover, and approximate through single task failover. But I am
not sure in the future. Later, if we want to support single task failover with
at least once/exactly once, where channel data may persist somewhere, I can not
say for sure this is purely a scheduler decision. We may end up having high
chances to introduce more connection types for single task failover to support
at least once/exactly once.
quote from [~trohrmann]
{quote}As far as I understand the existing
ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the
result partition if the downstream consumer disconnects. I believe that this is
not a strict contract of pipelined result partitions but more of an
implementation artefact. Couldn't we solve the problem of disappearing
pipelined result partitions by binding the lifecyle of a pipelined result
partition to the lifecycle of a Task? We could say that a Task can only
terminate once the pipelined result partition has been consumed. Moreover, a
Task will clean up the result partition if it fails or gets canceled. That way,
we have a clearly defined lifecycle and make sure that these results get
cleaned up (iff the Task reaches a terminal state).
{quote}
I totally agree.
Right now, the life cycle of {{ResultPartitionType.PIPELINED(_BOUNDED)}} is
“binding” to the consumer task, not very intuitive but reasonable. Because
{{PIPELINED(_BOUNDED)}} is consumed only once and as long as the downstream
restarts, the upstream is restarting correspondingly.
Is it reasonable to bind the partition to the producer? Yes, I think it is
following the best intuition as long as we make the task terminate after its
produced result partition is consumed. I think this can also simplify the logic
that needs to be applied on partitions through task resources but cannot due to
the task has already terminated.
was (Author: ym):
Here are some good discussions about motivations why to bring a new
ResultPartitionType which I think worth putting here.
{quote}I wanted to ask why we need a special ResultPartitionType for the
approximate local recovery? Shouldn't it be conceptually possible that we
support the normal and approximative recovery behaviour with the same pipelined
partitions?
{quote}
Conceptually speaking, yes, it is possible to unify normal and approximation
pipelined partitions; Practically speaking, I am not 100% sure they can be.
There are mainly two considerations leading me to introduce a different type.
*1) The first and most important reason is isolating changes to avoid affecting
the normal execution path.*
Since I am not sure whether the two can be unified, so I go with the safe step
first. This is also to identify differences between these two types (There
might be more differences for downstream reconnection as well). There could be
corner cases that I am not aware of until real implementation.
Even though conceptually yes, having *different implementation subclasses for
different connection behavior* does seem reasonable. It simplifies the logic
for different behavior. *So personally, I am leaning not to unify them*.
But certainly, if it turns out to be cleaner and simpler to unify the two
types, I have no objection to doing so. But from safety and easier-developing
purpose, starting with a different type seems to be a better choice.
*2) Differences between these two types;*
For upstream-reconnection, there are mainly two differences: *read* and
*release* upon these two types.
* In normal pipeline mode, for each subpartition, its view is created once,
and released when downstream disconnects. View release will cause subpartition
release, and eventually partition release.
* In approximate mode, for each subpartition, a view can be created and
released multiple times as long as one view is available at one instant for a
subpartition.
** for reading: upon reconnection, the reader should clean-up partial record
caused by downstream failure (This could be easily unified)
** for release: a partition is released only if the partition finishes
consumption (all data read) or its producer failed. The partition should not be
released when all its views are released because new views can be created. (a
bit difficult based on the current setting, let's discuss in the lifecycle part
later).
{quote}If we say that we can reconnect to every pipelined result partition
(including dropping partially consumed results), then it can be the
responsibility of the scheduler to make sure that producers are restarted as
well in order to ensure exactly/at-least once processing guarantees. If not,
then we would simply consume from where we have left off.
{quote}
This seems true for now, since we can achieve exactly/at least-once through
RegionPipeline failover, and approximate through single task failover. But I am
not sure in the future. Later, if we want to support single task failover with
at least once/exactly once, where channel data may persist somewhere, I can not
say for sure this is purely a scheduler decision. We may end up having high
chances to introduce more connection types for single task failover to support
at least once/exactly once.
{quote}As far as I understand the existing
ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the
result partition if the downstream consumer disconnects. I believe that this is
not a strict contract of pipelined result partitions but more of an
implementation artefact. Couldn't we solve the problem of disappearing
pipelined result partitions by binding the lifecyle of a pipelined result
partition to the lifecycle of a Task? We could say that a Task can only
terminate once the pipelined result partition has been consumed. Moreover, a
Task will clean up the result partition if it fails or gets canceled. That way,
we have a clearly defined lifecycle and make sure that these results get
cleaned up (iff the Task reaches a terminal state).
{quote}
I totally agree.
Right now, the life cycle of {{ResultPartitionType.PIPELINED(_BOUNDED)}} is
“binding” to the consumer task, not very intuitive but reasonable. Because
{{PIPELINED(_BOUNDED)}} is consumed only once and as long as the downstream
restarts, the upstream is restarting correspondingly.
Is it reasonable to bind the partition to the producer? Yes, I think it is
following the best intuition as long as we make the task terminate after its
produced result partition is consumed. I think this can also simplify the logic
that needs to be applied on partitions through task resources but cannot due to
the task has already terminated.
> Introduce a new ResultPartitionType for Approximate Local Recovery
> ------------------------------------------------------------------
>
> Key: FLINK-19632
> URL: https://issues.apache.org/jira/browse/FLINK-19632
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Yuan Mei
> Assignee: Yuan Mei
> Priority: Major
> Labels: pull-request-available
>
> # On downstream node failure, the upstream node needs to release a
> sub-partition view while keeping the result partition
> # When the downstream node reconnects, the subpartition view needs to be
> recreated. If the previous subpartition view still exists, it needs to be
> released.
> Clean up of the partial records should be handled in a separate Jira ticket
> FLINK-19547
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)