Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in
one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B,
if it received checkpoint trigger, it would know that all its precedant
tasks
> are finished, then it would wait till all the InputChannel received
EndOfPartition from the network (namely inputChannel.onBuffer() is called
with
> EndOfPartition) and then taking snapshot for the input channels, as the
normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint
if not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will
delay performing the first checkpoint by B after becoming a new source.
This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source
until it's EoP is consumed?

Regards,
Roman


On Thu, Jan 7, 2021 at 5:01 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Roman,
>
>    Very thanks for the feedbacks! I'll try to answer the issues inline:
>
> > 1. Option 1 is said to be not preferable because it wastes resources and
> adds complexity (new event).
> > However, the resources would be wasted for a relatively short time
> until the job finishes completely.
> > And compared to other options, complexity seems much lower. Or are
> differences in task completion times so huge and so common?
>
> There might be mixed jobs with both bounded sources and unbounded sources,
> in this case, the resource for the finished
> part of the job would not be able to be released.
>
> And the Option 1 also complicates the semantics of the EndOfPartition,
> since if we holding the tasks and we still need to
> notify the following tasks about all records are sent, we would have to
> introduce some kind of pre-EndOfPartition messages,
> which is similar to the current EndOfPartition, but do not cause the
> channels to be released.
>
> > 2. I think it would be helpful to describe how is rescaling handled in
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>
> For Option 2 and 3 we managed the states via the unit of operator, thus
> the process of rescaling would be the same with the normal checkpoint.
> For example, support one operator resides in a tasks with parallelism 4,
> if 2 fo the subtasks are finished, now the state of the operator is
> composed
> of the state of the 2 remaining subtask instance, if we rescale to 5 after
> failover, the state of the 2 previous remaining subtasks would be
> re-distributed
> to the 5 new subtasks after failover.
>
> If before failover all the 4 subtasks are finished, the operator would be
> marked as finished, after failover the operator would be still marked as
> finished,
> and all the subtask instance of this operator would skip all the methods
> like open(), endOfInput(), close() and would be excluded when taking
> checkpoints
> after failover.
>
>
> > 3. Option 3 assumes that the state of a finished task is not used.
> That's true for operator state, but what about channel state (captured by
> unaligned checkpoint)?
> > I think it still has to be sent downstream which invalidates this Option.
>
> For unaligned checkpoint, if in one checkpoint a subtask is marked as
> finished, then its descandent tasks would wait all the records are received
> from the finished tasks before taking checkpoint, thus in this case we
> would not have result partition state, but only have channel state for the
> downstream tasks that are still running.
>
> In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if
> it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
>
> I'll also optimize the FLIP to make it more clear~
>
> Best,
>  Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com>
> *Send Date:*Thu Jan 7 21:55:52 2021
> *Recipients:*Arvid Heise <ar...@ververica.com>
> *CC:*dev <dev@flink.apache.org>, user <u...@flink.apache.org>
> *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Thanks for starting this discussion (and sorry for probably duplicated
>> questions, I couldn't find them answered in FLIP or this thread).
>>
>> 1. Option 1 is said to be not preferable because it wastes resources and
>> adds complexity (new event).
>> However, the resources would be wasted for a relatively short time until
>> the job finishes completely.
>> And compared to other options, complexity seems much lower. Or are
>> differences in task completion times so huge and so common?
>>
>> 2. I think it would be helpful to describe how is rescaling handled in
>> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>>
>> 3. Option 3 assumes that the state of a finished task is not used. That's
>> true for operator state, but what about channel state (captured by
>> unaligned checkpoint)? I think it still has to be sent downstream which
>> invalidates this Option.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> We could introduce an interface, sth like `RequiresFinalization` or
>>>> `FinalizationListener` (all bad names). The operator itself knows when
>>>> it is ready to completely shut down, Async I/O would wait for all
>>>> requests, sink would potentially wait for a given number of
>>>> checkpoints.
>>>> The interface would have a method like `isFinalized()` that the
>>>> framework can call after each checkpoint (and potentially at other
>>>> points)
>>>
>>>
>>> I think we are mixing two different things here that may require
>>> different solutions:
>>> 1. Tasks (=sink) that may need to do something with the final checkpoint.
>>> 2. Tasks that only finish after having finished operations that do not
>>> depend on data flow (async I/O, but I could also think of some timer
>>> actions in process functions).
>>>
>>> Your proposal would help most for the first case. The second case can
>>> solved entirely with current methods without being especially complicated:
>>> - EOP is only emitted once Async I/O is done with all background tasks
>>> - All timers are fired in a process function (I think we rather want to
>>> fire immediately on EOP but that's a different discussion)
>>> The advantage of this approach over your idea is that you don't need to
>>> wait for a checkpoint to complete to check for finalization.
>>>
>>> Now let's look at the first case. I see two alternatives:
>>> - The new sink interface implicitly incorporates this listener. Since I
>>> don't see a use case outside sinks, we could simply add this method to the
>>> new sink interface.
>>> - We implicitly assume that a sink is done after having a successful
>>> checkpoint at the end. Then we just need a tag interface
>>> `RequiresFinalization`. It also feels like we should add the property
>>> `final` to checkpoint options to help the sink detect that this is the last
>>> checkpoint to be taken. We could also try to always have the final
>>> checkpoint without tag interface on new sinks...
>>>
>>> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> This is somewhat unrelated to the discussion about how to actually do
>>>> the triggering when sources shut down, I'll write on that separately. I
>>>> just wanted to get this quick thought out.
>>>>
>>>> For letting operators decide whether they actually want to wait for a
>>>> final checkpoint, which is relevant at least for Async I/O and
>>>> potentially for sinks.
>>>>
>>>> We could introduce an interface, sth like `RequiresFinalization` or
>>>> `FinalizationListener` (all bad names). The operator itself knows when
>>>> it is ready to completely shut down, Async I/O would wait for all
>>>> requests, sink would potentially wait for a given number of
>>>> checkpoints.
>>>> The interface would have a method like `isFinalized()` that the
>>>> framework can call after each checkpoint (and potentially at other
>>>> points)
>>>>
>>>> This way we would decouple that logic from things that don't actually
>>>> need it. What do you think?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

Reply via email to