Hi Avrid, 

         Very thanks for the feedbacks!

         For the second issue, sorry I think I might not make it very clear, 
I'm initially thinking the case that for example for a job with graph A -> B -> 
C, when we compute which tasks to trigger, A is still running, so we trigger A 
to start the checkpoint. However, before the triggering message reached A, A 
gets finished and the trigger message failed due to not found the task. In this 
case if we do not handle it, the checkpoint would failed due to timeout. 
However, by default failed checkpoint would cause job failure and we would also 
need to wait for a checkpoint interval for the next checkpoint. One solution 
would be check all the pending checkpoints to trigger B instead when JM is 
notified that A is finished.

       For the third issue, it should work if we store a special value for some 
filed in OperatorState or OperatorSubtaskState, for example, we might store a 
special subtaskState map inside the OperatorState to mark it is finished since 
the finished operator should always have an empty state. Very thanks for the 
advices! I'll try with this method. 

Best,
 Yun



------------------------------------------------------------------
From:Arvid Heise <ar...@ververica.com>
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao <yungao...@aliyun.com>
Cc:Aljoscha Krettek <aljos...@apache.org>; dev <dev@flink.apache.org>; user 
<u...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My 
gut feeling says that this is something we should only address for new sinks 
where we decouple the semantics of commits and checkpoints anyways. @Aljoscha 
Krettek any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition 
that is finished before the first checkpoint. Then, we would need to store the 
finished state of the subtask somehow. So I'm assuming, we still need to 
trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was 
assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we 
can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <yungao...@aliyun.com> wrote:

   Hi all,

         I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
                 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks

------------------------------------------------------------------
From:Yun Gao <yungao...@aliyun.com.INVALID>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <aljos...@apache.org>; dev <dev@flink.apache.org>; user 
<u...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <aljos...@apache.org>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple the logic for EndOfPartition with the 
> normal alignment behaviors to avoid the complex interference (which seems to 
> be a bit not trackable). We could do so by inserting suitable barriers for 
> input channels received but not processed EndOfPartition. For example, if a 
> task with four inputs has received barrier 2 from two input channels, but the 
> other two inputs do not received barrier 2  before EndOfPartition due to the 
> precedent tasks are finished, we could then insert barrier 2 for the last two 
> channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the 
> tasks, it would cause checkpoint failure and we should re-trigger its 
> descendants. But if possible we think we might skip this issue at the first 
> version to reduce the implementation complexity since it should not affect 
> the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we 
> could not re-run finished sources after failover. However, this would require 
> a new version of checkpoint meta. Currently Flink have an abstract 
> MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
> implementation. To add V4 which is only different from V3 for one field, the 
> current PoC want to introduce a new MetaV3V4SerializerBase extends 
> MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
> looks a little complex and we might need a general mechanism to extend 
> checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses 
> according to how to implement triggerCheckpoint, one is source tasks that 
> perform checkpoints immediately and another is the non-source tasks that 
> would notify CheckpointBarrierHandler in some way. However, since we have 
> multiple source tasks (legacy and new source) and multiple non-source tasks 
> (one-input, two-input, multiple-input), it would cause the cases that 
> multiple subclasses share the same implementation and  cause code repetition. 
> Currently the PoC introduces a new level of abstraction, namely 
> SourceStreamTasks and NonSourceStreamTasks, but what makes it more 
> complicated is that StreamingIterationHead extends OneInputStreamTask but it 
> need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?




-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng

Reply via email to