Hi all,
I would like to resume this discussion for supporting checkpoints after
tasks Finished :) Based on the previous discussion, we now implement a version
of PoC [1] to try the idea. During the PoC we also met with some possible
issues:
1. To include EndOfPartition into consideration for barrier alignment at
the TM side, we now tend to decouple the logic for EndOfPartition with the
normal alignment behaviors to avoid the complex interference (which seems to be
a bit not trackable). We could do so by inserting suitable barriers for input
channels received but not processed EndOfPartition. For example, if a task with
four inputs has received barrier 2 from two input channels, but the other two
inputs do not received barrier 2 before EndOfPartition due to the precedent
tasks are finished, we could then insert barrier 2 for the last two channels so
that we could still finish the checkpoint 2.
2. As we have discussed, if a tasks finished during we triggering the
tasks, it would cause checkpoint failure and we should re-trigger its
descendants. But if possible we think we might skip this issue at the first
version to reduce the implementation complexity since it should not affect the
correctness. We could considering support it in the following versions.
3. We would have to add a field isFinished to OperatorState so that we
could not re-run finished sources after failover. However, this would require a
new version of checkpoint meta. Currently Flink have an abstract
MetaV2V3SerializerBase and have V2 and V3 extends it to share some
implementation. To add V4 which is only different from V3 for one field, the
current PoC want to introduce a new MetaV3V4SerializerBase extends
MetaV2V3SerializerBase to share implementation between V3 and V4. This might
looks a little complex and we might need a general mechanism to extend
checkpoint meta format.
4. With the change StreamTask would have two types of subclasses according
to how to implement triggerCheckpoint, one is source tasks that perform
checkpoints immediately and another is the non-source tasks that would notify
CheckpointBarrierHandler in some way. However, since we have multiple source
tasks (legacy and new source) and multiple non-source tasks (one-input,
two-input, multiple-input), it would cause the cases that multiple subclasses
share the same implementation and cause code repetition. Currently the PoC
introduces a new level of abstraction, namely SourceStreamTasks and
NonSourceStreamTasks, but what makes it more complicated is that
StreamingIterationHead extends OneInputStreamTask but it need to perform
checkpoint as source tasks.
Glad to hear your opinions!
Best,
Yun
[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from
commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.