Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek <aljos...@apache.org> any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source
partition that is finished before the first checkpoint. Then, we would need
to store the finished state of the subtask somehow. So I'm assuming, we
still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState.
Maybe we can store the flag inside managed or raw state without changing
the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <yungao...@aliyun.com> wrote:

>    Hi all,
>
>          I tested the previous PoC with the current tests and I found some
> new issues that might cause divergence, and sorry for there might also be
> some reversal for some previous problems:
>
>
>      1. Which operators should wait for one more checkpoint before close ?
>
>         One motivation for this FLIP is to ensure the 2PC sink commits the
> last part of data before closed, which makes the sink operator need to wait
> for one more checkpoint like onEndOfInput() -> waitForCheckpoint() ->
> notifyCheckpointComplete() -> close(). This lead to the issue which
> operators should wait for checkpoint? Possible options are
>                  a. Make all the operators (or UDF) implemented
> notifyCheckpointCompleted method wait for one more checkpoint. One
> exception is that since we can only snapshot one or all tasks for a legacy
> source operator to avoid data repetition[1], we could not support legacy
> operators and its chained operators to wait for checkpoints since there
> will be deadlock if part of the tasks are finished, this would finally be
> solved after legacy source are deprecated. The PoC used this option for now.
>                 b. Make operators (or UDF) implemented a special marker
> interface to wait for one more checkpoint.
>
>
>    2. Do we need to solve the case that tasks finished before triggered ?
>
>       Previously I think we could postpone it, however, during testing I
> found that it might cause some problems since by default checkpoint failure
> would cause job failover, and the job would also need wait for another
> interval to trigger the next checkpoint. To pass the tests, I updated the
> PoC to include this part, and we may have a double think on if we need to
> include it or use some other options.
>
> 3. How to extend a new format for checkpoint meta ?
>
>     Sorry previously I gave a wrong estimation, after I extract a
> sub-component for (de)serialize operator state, I found the problem just
> goes to the new OperatorStateSerializer. The problem seems to be that v2,
> v3 and v4 have different fields, thus they use different process when
> (de)serialize, which is a bit different from the case that we have a fixed
> steps and each step has different logic. Thus we might either
>      a. Use base classes for each two version.
>      b. Or have a unified framework contains all the possible fields
> across all version, and use empty field serializer to skip some fields in
> each version.
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks
>
> ------------------------------------------------------------------
> From:Yun Gao <yungao...@aliyun.com.INVALID>
> Send Time:2020 Dec. 16 (Wed.) 11:07
> To:Aljoscha Krettek <aljos...@apache.org>; dev <dev@flink.apache.org>;
> user <u...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>      Hi Aljoscha,
>
>         Very thanks for the feedbacks! For the remaining issues:
>
>
>       > 1. You mean we would insert "artificial" barriers for barrier 2 in 
> case we receive  EndOfPartition while other inputs have already received 
> barrier 2? I think that makes sense, yes.
>
>
>       Yes, exactly, I would like to  insert "artificial" barriers for in case 
> we receive  EndOfPartition while other inputs have already received barrier 
> 2, and also for the similar cases that some input channels received 
> EndOfPartition during checkpoint 2 is ongoing and when the task receive 
> directly checkpoint triggering after all the precedent tasks are finished but 
> not received their EndOfPartition yet.
>
>
>      > 3. This indeed seems complex. Maybe we could switch to using 
> composition instead of inheritance to make this more extensible?
>
>
>     I re-checked the code and now I think composition would be better to 
> avoid complex inheritance hierarchy by exposing the changed part 
> `(de)serializeOperatorState` out, and I'll update the PoC to change this 
> part. Very thanks for the suggestions!
>
>
>    > 4. Don't we currently have the same problem? Even right now source tasks 
> and non-source tasks behave differently when it comes to checkpoints. Are you 
> saying we should fix that or would the new work introduce even
> more duplicate code?
>
>
>   Currently since we would never trigger non-source tasks, thus the 
> triggerCheckpoint logic is now implemented in the base StreamTask class and 
> only be used by the source tasks. However, after the change the non-source 
> tasks would also get triggered with a different behavior, we might not be 
> able to continue using this pattern.
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <aljos...@apache.org>
> Send Time:2020 Dec. 15 (Tue.) 18:11
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Thanks for the thorough update! I'll answer inline.
>
> On 14.12.20 16:33, Yun Gao wrote:
>
> >      1. To include EndOfPartition into consideration for barrier alignment 
> > at the TM side, we now tend to decouple the logic for EndOfPartition with 
> > the normal alignment behaviors to avoid the complex interference (which 
> > seems to be a bit not trackable). We could do so by inserting suitable 
> > barriers for input channels received but not processed EndOfPartition. For 
> > example, if a task with four inputs has received barrier 2 from two input 
> > channels, but the other two inputs do not received barrier 2  before 
> > EndOfPartition due to the precedent tasks are finished, we could then 
> > insert barrier 2 for the last two channels so that we could still finish 
> > the checkpoint 2.
>
> You mean we would insert "artificial" barriers for barrier 2 in case we
> receive  EndOfPartition while other inputs have already received barrier
> 2? I think that makes sense, yes.
>
>
> >      2. As we have discussed, if a tasks finished during we triggering the 
> > tasks, it would cause checkpoint failure and we should re-trigger its 
> > descendants. But if possible we think we might skip this issue at the first 
> > version to reduce the implementation complexity since it should not affect 
> > the correctness. We could considering support it in the following versions.
>
> I think this should be completely fine.
>
>
> >      3. We would have to add a field isFinished  to OperatorState so that 
> > we could not re-run finished sources after failover. However, this would 
> > require a new version of checkpoint meta. Currently Flink have an abstract 
> > MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
> > implementation. To add V4 which is only different from V3 for one field, 
> > the current PoC want to introduce a new MetaV3V4SerializerBase extends 
> > MetaV2V3SerializerBase to share implementation between V3 and V4. This 
> > might looks a little complex and we might need a general mechanism to 
> > extend checkpoint meta format.
>
> This indeed seems complex. Maybe we could switch to using composition
> instead of inheritance to make this more extensible?
>
>
> >      4. With the change StreamTask would have two types of subclasses 
> > according to how to implement triggerCheckpoint, one is source tasks that 
> > perform checkpoints immediately and another is the non-source tasks that 
> > would notify CheckpointBarrierHandler in some way. However, since we have 
> > multiple source tasks (legacy and new source) and multiple non-source tasks 
> > (one-input, two-input, multiple-input), it would cause the cases that 
> > multiple subclasses share the same implementation and  cause code 
> > repetition. Currently the PoC introduces a new level of abstraction, namely 
> > SourceStreamTasks and NonSourceStreamTasks, but what makes it more 
> > complicated is that StreamingIterationHead extends OneInputStreamTask but 
> > it need to perform checkpoint as source tasks.
>
> Don't we currently have the same problem? Even right now source tasks
> and non-source tasks behave differently when it comes to checkpoints.
> Are you saying we should fix that or would the new work introduce even
> more duplicate code?
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to