Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if
the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's
source subtask finishes.

> But since only the result partition part of the finished upstream need
wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the
EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has been flushed before get
to finish.
This is what I meant by "postpone JM notification from source". Just
blocking the task thread wouldn't add much complexity, though I'm not sure
if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and
postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao <yungao...@aliyun.com> wrote:

>       Hi Roman,
>
>           Very thanks for the feedbacks !
>
>
>         > Probably it would be simpler to just decline the RPC-triggered
> checkpoint
>         > if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
>         > But I wonder how significantly this waiting for EoP from every
> input will delay performing the first checkpoint
>         > by B after becoming a new source. This may in turn impact
> exactly-once sinks and incremental checkpoints.
>         > Maybe a better option would be to postpone JM notification from
> source until it's EoP is consumed?
>
>        I also agree with that there would indeed be possible cases that
> the checkpoint get slower since it could not skip
>        the data in  the result partition of the finished upstream task:
>             a) For aligned checkpoint, the cases would not happen since
> the downstream tasks would always need to
>                 process the buffers in order.
>            b)  With unaligned checkpoint enabled, the slower cases might
> happen if the downstream task processes very
>                 slowly.
>
>        But since only the result partition part of the finished upstream
> need wait to be processed, the other part of
>        the execution graph could  still perform the unaligned checkpoint
> normally, I think the average delay caused would
>        be much lower than the completely aligned checkpoint, but there
> would still be extremely bad cases that
>        the delay is long.
>
>        Declining the RPC-trigger checkpoint would indeed simplify the
> implementation, but since currently by default the
>        failed checkpoint would cause job failover, thus we might have some
> concerns in directly decline the checkpoint.
>        For postpone the notification the JM notification, since current JM
> should not be able to know if the task has
>        received all the EndOfPartition from the upstream tasks, we might
> need to introduce new RPC for notifying the
>        state and since the triggering is not atomic, we may also met with
> some  synchronization issues between JM and TM,
>        which would introduce some complexity.
>
>       Thus another possible option might be let the upstream task to wait
> till all the pending buffers in the result partition has
>       been flushed before get to finish. We could only do the wait for the
> PipelineResultPartition so it won't affect the batch
>       jobs. With the waiting the unaligned checkpoint could continue to
> trigger the upstream task and skip the buffers in
>       the result partition. Since the result partition state would be kept
> within the operator state of the last operator, after failover
>       we would found that the last operator has an non-empty state and we
> would restart the tasks containing this operator to
>       resend the snapshotted buffers. Of course this would also introduce
> some complexity, and since the probability of long delay
>       would be lower than the completely aligned case, do you think it
> would be ok for us to view it as an optimization and
>       postpone it to future versions ?
>
>      Best,
>      Yun
>
>
>
> ------------------------------------------------------------------
> From:Khachatryan Roman <khachatryan.ro...@gmail.com>
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun Gao <yungao...@aliyun.com>
> Cc:Arvid Heise <ar...@ververica.com>; dev <dev@flink.apache.org>; user <
> u...@flink.apache.org>
> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> Thanks a lot for your answers Yun,
>
> > In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> > choose B as the new "source" to trigger checkpoint via RPC. For task B,
> if it received checkpoint trigger, it would know that all its precedant
> tasks
> > are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> > EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> > we would be able to ensure the finished tasks always have an empty state.
>
> Probably it would be simpler to just decline the RPC-triggered checkpoint
> if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
> But I wonder how significantly this waiting for EoP from every input will
> delay performing the first checkpoint by B after becoming a new source.
> This may in turn impact exactly-once sinks and incremental checkpoints.
> Maybe a better option would be to postpone JM notification from source
> until it's EoP is consumed?
>
> Regards,
> Roman
>
>
>

Reply via email to