Hi Arvid,

          Very thanks for the feedbacks!

         > For 2) the race condition, I was more thinking of still injecting the
         > barrier at the source in all cases, but having some kind of 
short-cut to
         > immediately execute the RPC inside the respective taskmanager. 
However,
         > that may prove hard in case of dynamic scale-ins. Nevertheless, 
because of
         > this race condition, we should still take some time to think about 
it as it
         > effectively means we need to support handling a barrier in a 
finished task
         > anyways. Maybe a finished task is still assigned to a TM with JM as a
         > fallback?
    For faked finished tasks, I have some concerns that if the faked finished 
tasks reside in the JM side, there should still be the race condition between 
triggering 
and tasks get finished, and if the faked finished tasks reside in the TM side, 
we would have to keep consider these tasks in scheduler when failover happens. 
    Besides, we would also need to keep the channels between the faked finished 
tasks and normal tasks to pass the checkpoint barriers, this would have some 
conflicts with 
    the current tasks' lifecycle since we still need to keep channels open and 
send messages after EndOfPartitions are sent. If we have mixed jobs with both 
bounded and
 unbounded sources, the left network channels would not have a chance to get 
closed.

   Best,
    Yun


------------------------------------------------------------------
From:Arvid Heise <ar...@ververica.com>
Send Time:2021 Jan. 6 (Wed.) 00:28
To:Yun Gao <yungao...@aliyun.com>
Cc:Aljoscha Krettek <aljos...@apache.org>; dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race condition, we should still take some time to think about it as it
effectively means we need to support handling a barrier in a finished task
anyways. Maybe a finished task is still assigned to a TM with JM as a
fallback?

For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, let's assume you have
source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
is still running. Or I didn't get your question ;).

On Tue, Jan 5, 2021 at 5:00 PM Yun Gao <yungao...@aliyun.com> wrote:

>      Hi Aljoscha,
>
>          Very thanks for the feedbacks!
>
>          For the second issue, I'm indeed thinking the race condition
> between deciding to trigger and operator get finished. And for this point,
>
>  >
> One thought here is this: will there ever be intermediate operators that
> > should be running that are not connected to at least once source? The
> > only case I can think of right now is async I/O. Or are there others? If
> > we think that there will never be intermediate operators that are not
> > connected to at least once source we might come up with a simpler
> > solution.
>
>      I think there are still cases that the intermediate operators runs
> with all its sources have finished, for example, source -> sink writer ->
> sink committer -> sink global committer,  since sink committer need to wait
> for one more checkpoint between endOfInput and close,
> it would continue to run after the source and sink writer are finished,
> until we could finish one checkpoint. And since the four operators could
> also be chained in one task, we may also need to consider the case that
> part of operators are finished when taking snapshot in
> of the tasks.
>
>    Best,
>     Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <aljos...@apache.org>
> Send Time:2021 Jan. 5 (Tue.) 22:34
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> On 2021/01/05 10:16, Arvid Heise wrote:
> >1. I'd think that this is an orthogonal issue, which I'd solve separately.
> >My gut feeling says that this is something we should only address for new
> >sinks where we decouple the semantics of commits and checkpoints
> >anyways. @Aljoscha
> >Krettek <aljos...@apache.org> any idea on this one?
>
> I also think it's somewhat orthogonal, let's see where we land here once
> the other issues are hammered out.
>
> >2. I'm not sure I get it completely. Let's assume we have a source
>
> >partition that is finished before the first checkpoint. Then, we would need
> >to store the finished state of the subtask somehow. So I'm assuming, we
> >still need to trigger some checkpointing code on finished subtasks.
>
> What he's talking about here is the race condition between a) checkpoint
> coordinator decides to do a checkpoint and b) a source operator shuts
> down.
>
> Normally, the checkpoint coordinator only needs to trigger sources, and
> not intermediate operators. When we allow sources to shut down,
> intermediate operators now can become the "head" of a pipeline and
> become the things that need to be triggered.
>
> One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
>
> >3. Do we really want to store the finished flag in OperatorState? I was
> >assuming we want to have it more fine-grained on OperatorSubtaskState.
> >Maybe we can store the flag inside managed or raw state without changing
> >the format?
>
> I think we cannot store it in `OperatorSubtaskState` because of how
> operator state (the actual `ListState` that operators use) is reshuffled
> on restore to all operators. So normally it doesn't make sense to say
> that one of the subtasks is done when operator state is involved. Only
> when all subtasks are done can we record this operator as done, I think.
>
> Best,
> Aljoscha
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to