Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <aljos...@apache.org>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple the logic for EndOfPartition with the 
> normal alignment behaviors to avoid the complex interference (which seems to 
> be a bit not trackable). We could do so by inserting suitable barriers for 
> input channels received but not processed EndOfPartition. For example, if a 
> task with four inputs has received barrier 2 from two input channels, but the 
> other two inputs do not received barrier 2  before EndOfPartition due to the 
> precedent tasks are finished, we could then insert barrier 2 for the last two 
> channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the 
> tasks, it would cause checkpoint failure and we should re-trigger its 
> descendants. But if possible we think we might skip this issue at the first 
> version to reduce the implementation complexity since it should not affect 
> the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we 
> could not re-run finished sources after failover. However, this would require 
> a new version of checkpoint meta. Currently Flink have an abstract 
> MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
> implementation. To add V4 which is only different from V3 for one field, the 
> current PoC want to introduce a new MetaV3V4SerializerBase extends 
> MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
> looks a little complex and we might need a general mechanism to extend 
> checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses 
> according to how to implement triggerCheckpoint, one is source tasks that 
> perform checkpoints immediately and another is the non-source tasks that 
> would notify CheckpointBarrierHandler in some way. However, since we have 
> multiple source tasks (legacy and new source) and multiple non-source tasks 
> (one-input, two-input, multiple-input), it would cause the cases that 
> multiple subclasses share the same implementation and  cause code repetition. 
> Currently the PoC introduces a new level of abstraction, namely 
> SourceStreamTasks and NonSourceStreamTasks, but what makes it more 
> complicated is that StreamingIterationHead extends OneInputStreamTask but it 
> need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?

Reply via email to