Hi Avrid,
Very thanks for the feedbacks!
For the second issue, sorry I think I might not make it very clear,
I'm initially thinking the case that for example for a job with graph A -> B ->
C, when we compute which tasks to trigger, A is still running, so we trigger A
to start the checkpoint. However, before the triggering message reached A, A
gets finished and the trigger message failed due to not found the task. In this
case if we do not handle it, the checkpoint would failed due to timeout.
However, by default failed checkpoint would cause job failure and we would also
need to wait for a checkpoint interval for the next checkpoint. One solution
would be check all the pending checkpoints to trigger B instead when JM is
notified that A is finished.
For the third issue, it should work if we store a special value for some
filed in OperatorState or OperatorSubtaskState, for example, we might store a
special subtaskState map inside the OperatorState to mark it is finished since
the finished operator should always have an empty state. Very thanks for the
advices! I'll try with this method.
Best,
Yun
------------------------------------------------------------------
From:Arvid Heise <[email protected]>
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao <[email protected]>
Cc:Aljoscha Krettek <[email protected]>; dev <[email protected]>; user
<[email protected]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Yun,
1. I'd think that this is an orthogonal issue, which I'd solve separately. My
gut feeling says that this is something we should only address for new sinks
where we decouple the semantics of commits and checkpoints anyways. @Aljoscha
Krettek any idea on this one?
2. I'm not sure I get it completely. Let's assume we have a source partition
that is finished before the first checkpoint. Then, we would need to store the
finished state of the subtask somehow. So I'm assuming, we still need to
trigger some checkpointing code on finished subtasks.
3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we
can store the flag inside managed or raw state without changing the format?
On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <[email protected]> wrote:
Hi all,
I tested the previous PoC with the current tests and I found some new
issues that might cause divergence, and sorry for there might also be some
reversal for some previous problems:
1. Which operators should wait for one more checkpoint before close ?
One motivation for this FLIP is to ensure the 2PC sink commits the last
part of data before closed, which makes the sink operator need to wait for one
more checkpoint like onEndOfInput() -> waitForCheckpoint() ->
notifyCheckpointComplete() -> close(). This lead to the issue which operators
should wait for checkpoint? Possible options are
a. Make all the operators (or UDF) implemented
notifyCheckpointCompleted method wait for one more checkpoint. One exception is
that since we can only snapshot one or all tasks for a legacy source operator
to avoid data repetition[1], we could not support legacy operators and its
chained operators to wait for checkpoints since there will be deadlock if part
of the tasks are finished, this would finally be solved after legacy source are
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker
interface to wait for one more checkpoint.
2. Do we need to solve the case that tasks finished before triggered ?
Previously I think we could postpone it, however, during testing I found
that it might cause some problems since by default checkpoint failure would
cause job failover, and the job would also need wait for another interval to
trigger the next checkpoint. To pass the tests, I updated the PoC to include
this part, and we may have a double think on if we need to include it or use
some other options.
3. How to extend a new format for checkpoint meta ?
Sorry previously I gave a wrong estimation, after I extract a sub-component
for (de)serialize operator state, I found the problem just goes to the new
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have
different fields, thus they use different process when (de)serialize, which is
a bit different from the case that we have a fixed steps and each step has
different logic. Thus we might either
a. Use base classes for each two version.
b. Or have a unified framework contains all the possible fields across all
version, and use empty field serializer to skip some fields in each version.
Best,
Yun
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks
------------------------------------------------------------------
From:Yun Gao <[email protected]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[email protected]>; dev <[email protected]>; user
<[email protected]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Aljoscha,
Very thanks for the feedbacks! For the remaining issues:
> 1. You mean we would insert "artificial" barriers for barrier 2 in case
we receive EndOfPartition while other inputs have already received barrier 2?
I think that makes sense, yes.
Yes, exactly, I would like to insert "artificial" barriers for in case
we receive EndOfPartition while other inputs have already received barrier 2,
and also for the similar cases that some input channels received EndOfPartition
during checkpoint 2 is ongoing and when the task receive directly checkpoint
triggering after all the precedent tasks are finished but not received their
EndOfPartition yet.
> 3. This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?
I re-checked the code and now I think composition would be better to avoid
complex inheritance hierarchy by exposing the changed part
`(de)serializeOperatorState` out, and I'll update the PoC to change this part.
Very thanks for the suggestions!
> 4. Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints. Are you
saying we should fix that or would the new work introduce even
more duplicate code?
Currently since we would never trigger non-source tasks, thus the
triggerCheckpoint logic is now implemented in the base StreamTask class and
only be used by the source tasks. However, after the change the non-source
tasks would also get triggered with a different behavior, we might not be able
to continue using this pattern.
Best,
Yun
------------------------------------------------------------------
From:Aljoscha Krettek <[email protected]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[email protected]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Thanks for the thorough update! I'll answer inline.
On 14.12.20 16:33, Yun Gao wrote:
> 1. To include EndOfPartition into consideration for barrier alignment at
> the TM side, we now tend to decouple the logic for EndOfPartition with the
> normal alignment behaviors to avoid the complex interference (which seems to
> be a bit not trackable). We could do so by inserting suitable barriers for
> input channels received but not processed EndOfPartition. For example, if a
> task with four inputs has received barrier 2 from two input channels, but the
> other two inputs do not received barrier 2 before EndOfPartition due to the
> precedent tasks are finished, we could then insert barrier 2 for the last two
> channels so that we could still finish the checkpoint 2.
You mean we would insert "artificial" barriers for barrier 2 in case we
receive EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.
> 2. As we have discussed, if a tasks finished during we triggering the
> tasks, it would cause checkpoint failure and we should re-trigger its
> descendants. But if possible we think we might skip this issue at the first
> version to reduce the implementation complexity since it should not affect
> the correctness. We could considering support it in the following versions.
I think this should be completely fine.
> 3. We would have to add a field isFinished to OperatorState so that we
> could not re-run finished sources after failover. However, this would require
> a new version of checkpoint meta. Currently Flink have an abstract
> MetaV2V3SerializerBase and have V2 and V3 extends it to share some
> implementation. To add V4 which is only different from V3 for one field, the
> current PoC want to introduce a new MetaV3V4SerializerBase extends
> MetaV2V3SerializerBase to share implementation between V3 and V4. This might
> looks a little complex and we might need a general mechanism to extend
> checkpoint meta format.
This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?
> 4. With the change StreamTask would have two types of subclasses
> according to how to implement triggerCheckpoint, one is source tasks that
> perform checkpoints immediately and another is the non-source tasks that
> would notify CheckpointBarrierHandler in some way. However, since we have
> multiple source tasks (legacy and new source) and multiple non-source tasks
> (one-input, two-input, multiple-input), it would cause the cases that
> multiple subclasses share the same implementation and cause code repetition.
> Currently the PoC introduces a new level of abstraction, namely
> SourceStreamTasks and NonSourceStreamTasks, but what makes it more
> complicated is that StreamingIterationHead extends OneInputStreamTask but it
> need to perform checkpoint as source tasks.
Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?
--
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni)
Cheng