Hi Roman,
Very thanks for the feedbacks !
> Probably it would be simpler to just decline the RPC-triggered
checkpoint
> if not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).
> But I wonder how significantly this waiting for EoP from every input
will delay performing the first checkpoint
> by B after becoming a new source. This may in turn impact
exactly-once sinks and incremental checkpoints.
> Maybe a better option would be to postpone JM notification from
source until it's EoP is consumed?
I also agree with that there would indeed be possible cases that the
checkpoint get slower since it could not skip
the data in the result partition of the finished upstream task:
a) For aligned checkpoint, the cases would not happen since the
downstream tasks would always need to
process the buffers in order.
b) With unaligned checkpoint enabled, the slower cases might happen
if the downstream task processes very
slowly.
But since only the result partition part of the finished upstream need
wait to be processed, the other part of
the execution graph could still perform the unaligned checkpoint normally, I
think the average delay caused would
be much lower than the completely aligned checkpoint, but there would still be
extremely bad cases that
the delay is long.
Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
For postpone the notification the JM notification, since current JM
should not be able to know if the task has
received all the EndOfPartition from the upstream tasks, we might need
to introduce new RPC for notifying the
state and since the triggering is not atomic, we may also met with some
synchronization issues between JM and TM,
which would introduce some complexity.
Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has
been flushed before get to finish. We could only do the wait for the
PipelineResultPartition so it won't affect the batch
jobs. With the waiting the unaligned checkpoint could continue to trigger
the upstream task and skip the buffers in
the result partition. Since the result partition state would be kept
within the operator state of the last operator, after failover
we would found that the last operator has an non-empty state and we would
restart the tasks containing this operator to
resend the snapshotted buffers. Of course this would also introduce some
complexity, and since the probability of long delay
would be lower than the completely aligned case, do you think it would be
ok for us to view it as an optimization and
postpone it to future versions ?
Best,
Yun
------------------------------------------------------------------
From:Khachatryan Roman <[email protected]>
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao <[email protected]>
Cc:Arvid Heise <[email protected]>; dev <[email protected]>; user
<[email protected]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Thanks a lot for your answers Yun,
> In detail, support we have a job with the graph A -> B -> C, support in one
> checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it
> received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
Probably it would be simpler to just decline the RPC-triggered checkpoint if
not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).
But I wonder how significantly this waiting for EoP from every input will delay
performing the first checkpoint by B after becoming a new source. This may in
turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until
it's EoP is consumed?
Regards,
Roman