Hi Roman, Very thanks for the feedbacks! I'll try to answer the issues inline:
> 1. Option 1 is said to be not preferable because it wastes resources and adds > complexity (new event). > However, the resources would be wasted for a relatively short time until the > job finishes completely. > And compared to other options, complexity seems much lower. Or are > differences in task completion times so huge and so common? There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished part of the job would not be able to be released. And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages, which is similar to the current EndOfPartition, but do not cause the channels to be released. > 2. I think it would be helpful to describe how is rescaling handled in > Options 2 and 3 (or maybe it's not supported for jobs about to finish). For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint. For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed to the 5 new subtasks after failover. If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished, and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints after failover. > 3. Option 3 assumes that the state of a finished task is not used. That's > true for operator state, but what about channel state (captured by unaligned > checkpoint)? > I think it still has to be sent downstream which invalidates this Option. For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the downstream tasks that are still running. In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then we would be able to ensure the finished tasks always have an empty state. I'll also optimize the FLIP to make it more clear~ Best, Yun ------------------Original Mail ------------------ Sender:Khachatryan Roman <khachatryan.ro...@gmail.com> Send Date:Thu Jan 7 21:55:52 2021 Recipients:Arvid Heise <ar...@ververica.com> CC:dev <dev@flink.apache.org>, user <u...@flink.apache.org> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread). 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively short time until the job finishes completely. And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option. Regards, Roman On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <ar...@ververica.com> wrote: We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) I think we are mixing two different things here that may require different solutions: 1. Tasks (=sink) that may need to do something with the final checkpoint. 2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions). Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated: - EOP is only emitted once Async I/O is done with all background tasks - All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion) The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization. Now let's look at the first case. I see two alternatives: - The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface. - We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks... On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <aljos...@apache.org> wrote: This is somewhat unrelated to the discussion about how to actually do the triggering when sources shut down, I'll write on that separately. I just wanted to get this quick thought out. For letting operators decide whether they actually want to wait for a final checkpoint, which is relevant at least for Async I/O and potentially for sinks. We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) This way we would decouple that logic from things that don't actually need it. What do you think? Best, Aljoscha -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng