Thanks a lot for your answers Yun, > In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with > EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state.
Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? Regards, Roman On Thu, Jan 7, 2021 at 5:01 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Roman, > > Very thanks for the feedbacks! I'll try to answer the issues inline: > > > 1. Option 1 is said to be not preferable because it wastes resources and > adds complexity (new event). > > However, the resources would be wasted for a relatively short time > until the job finishes completely. > > And compared to other options, complexity seems much lower. Or are > differences in task completion times so huge and so common? > > There might be mixed jobs with both bounded sources and unbounded sources, > in this case, the resource for the finished > part of the job would not be able to be released. > > And the Option 1 also complicates the semantics of the EndOfPartition, > since if we holding the tasks and we still need to > notify the following tasks about all records are sent, we would have to > introduce some kind of pre-EndOfPartition messages, > which is similar to the current EndOfPartition, but do not cause the > channels to be released. > > > 2. I think it would be helpful to describe how is rescaling handled in > Options 2 and 3 (or maybe it's not supported for jobs about to finish). > > For Option 2 and 3 we managed the states via the unit of operator, thus > the process of rescaling would be the same with the normal checkpoint. > For example, support one operator resides in a tasks with parallelism 4, > if 2 fo the subtasks are finished, now the state of the operator is > composed > of the state of the 2 remaining subtask instance, if we rescale to 5 after > failover, the state of the 2 previous remaining subtasks would be > re-distributed > to the 5 new subtasks after failover. > > If before failover all the 4 subtasks are finished, the operator would be > marked as finished, after failover the operator would be still marked as > finished, > and all the subtask instance of this operator would skip all the methods > like open(), endOfInput(), close() and would be excluded when taking > checkpoints > after failover. > > > > 3. Option 3 assumes that the state of a finished task is not used. > That's true for operator state, but what about channel state (captured by > unaligned checkpoint)? > > I think it still has to be sent downstream which invalidates this Option. > > For unaligned checkpoint, if in one checkpoint a subtask is marked as > finished, then its descandent tasks would wait all the records are received > from the finished tasks before taking checkpoint, thus in this case we > would not have result partition state, but only have channel state for the > downstream tasks that are still running. > > In detail, support we have a job with the graph A -> B -> C, support in > one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if > it received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received > EndOfPartition from the network (namely inputChannel.onBuffer() is called > with > EndOfPartition) and then taking snapshot for the input channels, as the > normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. > > I'll also optimize the FLIP to make it more clear~ > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com> > *Send Date:*Thu Jan 7 21:55:52 2021 > *Recipients:*Arvid Heise <ar...@ververica.com> > *CC:*dev <dev@flink.apache.org>, user <u...@flink.apache.org> > *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > >> Thanks for starting this discussion (and sorry for probably duplicated >> questions, I couldn't find them answered in FLIP or this thread). >> >> 1. Option 1 is said to be not preferable because it wastes resources and >> adds complexity (new event). >> However, the resources would be wasted for a relatively short time until >> the job finishes completely. >> And compared to other options, complexity seems much lower. Or are >> differences in task completion times so huge and so common? >> >> 2. I think it would be helpful to describe how is rescaling handled in >> Options 2 and 3 (or maybe it's not supported for jobs about to finish). >> >> 3. Option 3 assumes that the state of a finished task is not used. That's >> true for operator state, but what about channel state (captured by >> unaligned checkpoint)? I think it still has to be sent downstream which >> invalidates this Option. >> >> Regards, >> Roman >> >> >> On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> We could introduce an interface, sth like `RequiresFinalization` or >>>> `FinalizationListener` (all bad names). The operator itself knows when >>>> it is ready to completely shut down, Async I/O would wait for all >>>> requests, sink would potentially wait for a given number of >>>> checkpoints. >>>> The interface would have a method like `isFinalized()` that the >>>> framework can call after each checkpoint (and potentially at other >>>> points) >>> >>> >>> I think we are mixing two different things here that may require >>> different solutions: >>> 1. Tasks (=sink) that may need to do something with the final checkpoint. >>> 2. Tasks that only finish after having finished operations that do not >>> depend on data flow (async I/O, but I could also think of some timer >>> actions in process functions). >>> >>> Your proposal would help most for the first case. The second case can >>> solved entirely with current methods without being especially complicated: >>> - EOP is only emitted once Async I/O is done with all background tasks >>> - All timers are fired in a process function (I think we rather want to >>> fire immediately on EOP but that's a different discussion) >>> The advantage of this approach over your idea is that you don't need to >>> wait for a checkpoint to complete to check for finalization. >>> >>> Now let's look at the first case. I see two alternatives: >>> - The new sink interface implicitly incorporates this listener. Since I >>> don't see a use case outside sinks, we could simply add this method to the >>> new sink interface. >>> - We implicitly assume that a sink is done after having a successful >>> checkpoint at the end. Then we just need a tag interface >>> `RequiresFinalization`. It also feels like we should add the property >>> `final` to checkpoint options to help the sink detect that this is the last >>> checkpoint to be taken. We could also try to always have the final >>> checkpoint without tag interface on new sinks... >>> >>> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> This is somewhat unrelated to the discussion about how to actually do >>>> the triggering when sources shut down, I'll write on that separately. I >>>> just wanted to get this quick thought out. >>>> >>>> For letting operators decide whether they actually want to wait for a >>>> final checkpoint, which is relevant at least for Async I/O and >>>> potentially for sinks. >>>> >>>> We could introduce an interface, sth like `RequiresFinalization` or >>>> `FinalizationListener` (all bad names). The operator itself knows when >>>> it is ready to completely shut down, Async I/O would wait for all >>>> requests, sink would potentially wait for a given number of >>>> checkpoints. >>>> The interface would have a method like `isFinalized()` that the >>>> framework can call after each checkpoint (and potentially at other >>>> points) >>>> >>>> This way we would decouple that logic from things that don't actually >>>> need it. What do you think? >>>> >>>> Best, >>>> Aljoscha >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >>