Hi Aljoscha, 

         Very thanks for the feedbacks!

         For the second issue, I'm indeed thinking the race condition between 
deciding to trigger and operator get finished. And for this point,
 > One thought here is this: will there ever be intermediate operators that 
> should be running that are not connected to at least once source? The 
> only case I can think of right now is async I/O. Or are there others? If 
> we think that there will never be intermediate operators that are not 
> connected to at least once source we might come up with a simpler 
> solution.
     I think there are still cases that the intermediate operators runs with 
all its sources have finished, for example, source -> sink writer -> sink 
committer -> sink global committer,  since sink committer need to wait for one 
more checkpoint between endOfInput and close, 
it would continue to run after the source and sink writer are finished, until 
we could finish one checkpoint. And since the four operators could also be 
chained in one task, we may also need to consider the case that part of 
operators are finished when taking snapshot in
of the tasks.

   Best,
    Yun



------------------------------------------------------------------
From:Aljoscha Krettek <aljos...@apache.org>
Send Time:2021 Jan. 5 (Tue.) 22:34
To:dev <dev@flink.apache.org>
Cc:Yun Gao <yungao...@aliyun.com>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

On 2021/01/05 10:16, Arvid Heise wrote:
>1. I'd think that this is an orthogonal issue, which I'd solve separately.
>My gut feeling says that this is something we should only address for new
>sinks where we decouple the semantics of commits and checkpoints
>anyways. @Aljoscha
>Krettek <aljos...@apache.org> any idea on this one?

I also think it's somewhat orthogonal, let's see where we land here once 
the other issues are hammered out.

>2. I'm not sure I get it completely. Let's assume we have a source
>partition that is finished before the first checkpoint. Then, we would need
>to store the finished state of the subtask somehow. So I'm assuming, we
>still need to trigger some checkpointing code on finished subtasks.

What he's talking about here is the race condition between a) checkpoint 
coordinator decides to do a checkpoint and b) a source operator shuts 
down.

Normally, the checkpoint coordinator only needs to trigger sources, and 
not intermediate operators. When we allow sources to shut down, 
intermediate operators now can become the "head" of a pipeline and 
become the things that need to be triggered.

One thought here is this: will there ever be intermediate operators that 
should be running that are not connected to at least once source? The 
only case I can think of right now is async I/O. Or are there others? If 
we think that there will never be intermediate operators that are not 
connected to at least once source we might come up with a simpler 
solution.

>3. Do we really want to store the finished flag in OperatorState? I was
>assuming we want to have it more fine-grained on OperatorSubtaskState.
>Maybe we can store the flag inside managed or raw state without changing
>the format?

I think we cannot store it in `OperatorSubtaskState` because of how 
operator state (the actual `ListState` that operators use) is reshuffled 
on restore to all operators. So normally it doesn't make sense to say 
that one of the subtasks is done when operator state is involved. Only 
when all subtasks are done can we record this operator as done, I think.

Best,
Aljoscha

Reply via email to