Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds 
> complexity (new event). 
> However, the resources would be wasted for a relatively short time until the 
> job finishes completely. 
> And compared to other options, complexity seems much lower. Or are 
> differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in 
this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if 
we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to 
introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels 
to be released.

> 2. I think it would be helpful to describe how is rescaling handled in 
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the 
process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 
fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after 
failover, the state of the 2 previous remaining subtasks would be 
re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be 
marked as finished, after failover the operator would be still marked as 
finished, 
and all the subtask instance of this operator would skip all the methods like 
open(), endOfInput(), close() and would be excluded when taking checkpoints
 after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's 
> true for operator state, but what about channel state (captured by unaligned 
> checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, 
then its descandent tasks would wait all the records are received
 from the finished tasks before taking checkpoint, thus in this case we would 
not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one 
checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received 
EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal 
unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun



 ------------------Original Mail ------------------
Sender:Khachatryan Roman <khachatryan.ro...@gmail.com>
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise <ar...@ververica.com>
CC:dev <dev@flink.apache.org>, user <u...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion (and sorry for probably duplicated 
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds 
complexity (new event). 
However, the resources would be wasted for a relatively short time until the 
job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences 
in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 
2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true 
for operator state, but what about channel state (captured by unaligned 
checkpoint)? I think it still has to be sent downstream which invalidates this 
Option.

Regards,
Roman

On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <ar...@ververica.com> wrote:

We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)

I think we are mixing two different things here that may require different 
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend 
on data flow (async I/O, but I could also think of some timer actions in 
process functions).

Your proposal would help most for the first case. The second case can solved 
entirely with current methods without being especially complicated: 
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire 
immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait 
for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't 
see a use case outside sinks, we could simply add this method to the new sink 
interface.
- We implicitly assume that a sink is done after having a successful checkpoint 
at the end. Then we just need a tag interface `RequiresFinalization`. It also 
feels like we should add the property `final` to checkpoint options to help the 
sink detect that this is the last checkpoint to be taken. We could also try to 
always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <aljos...@apache.org> wrote:
This is somewhat unrelated to the discussion about how to actually do 
the triggering when sources shut down, I'll write on that separately. I 
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a 
final checkpoint, which is relevant at least for Async I/O and 
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)

This way we would decouple that logic from things that don't actually 
need it. What do you think?

Best,
Aljoscha


-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng 

Reply via email to