Hi Roman,

          Very thanks for the feedbacks !
        > Probably it would be simpler to just decline the RPC-triggered 
checkpoint 
        > if not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

        > But I wonder how significantly this waiting for EoP from every input 
will delay performing the first checkpoint 
        > by B after becoming a new source. This may in turn impact 
exactly-once sinks and incremental checkpoints.
        > Maybe a better option would be to postpone JM notification from 
source until it's EoP is consumed?

       I also agree with that there would indeed be possible cases that the 
checkpoint get slower since it could not skip
the data in  the result partition of the finished upstream task:
            a) For aligned checkpoint, the cases would not happen since the 
downstream tasks would always need to 
                process the buffers in order. 
           b)  With unaligned checkpoint enabled, the slower cases might happen 
if the downstream task processes very 
                slowly. 

       But since only the result partition part of the finished upstream need 
wait to be processed, the other part of 
the execution graph could  still perform the unaligned checkpoint normally, I 
think the average delay caused would 
 be much lower than the completely aligned checkpoint, but there would still be 
extremely bad cases that
       the delay is long. 

       Declining the RPC-trigger checkpoint would indeed simplify the 
implementation, but since currently by default the
       failed checkpoint would cause job failover, thus we might have some 
concerns in directly decline the checkpoint.
       For postpone the notification the JM notification, since current JM 
should not be able to know if the task has 
       received all the EndOfPartition from the upstream tasks, we might need 
to introduce new RPC for notifying the 
       state and since the triggering is not atomic, we may also met with some  
synchronization issues between JM and TM, 
       which would introduce some complexity. 
      Thus another possible option might be let the upstream task to wait till 
all the pending buffers in the result partition has
      been flushed before get to finish. We could only do the wait for the 
PipelineResultPartition so it won't affect the batch
      jobs. With the waiting the unaligned checkpoint could continue to trigger 
the upstream task and skip the buffers in
      the result partition. Since the result partition state would be kept 
within the operator state of the last operator, after failover
      we would found that the last operator has an non-empty state and we would 
restart the tasks containing this operator to 
      resend the snapshotted buffers. Of course this would also introduce some 
complexity, and since the probability of long delay 
      would be lower than the completely aligned case, do you think it would be 
ok for us to view it as an optimization and 
      postpone it to future versions ? 

     Best,
     Yun


------------------------------------------------------------------
From:Khachatryan Roman <khachatryan.ro...@gmail.com>
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao <yungao...@aliyun.com>
Cc:Arvid Heise <ar...@ververica.com>; dev <dev@flink.apache.org>; user 
<u...@flink.apache.org>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one 
> checkpoint A has reported FINISHED, CheckpointCoordinator would 
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
> received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received 
> EndOfPartition from the network (namely inputChannel.onBuffer() is called 
> with 
> EndOfPartition) and then taking snapshot for the input channels, as the 
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if 
not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay 
performing the first checkpoint by B after becoming a new source. This may in 
turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until 
it's EoP is consumed?

Regards,
Roman

Reply via email to