Thanks for the thorough update! I'll answer inline.
On 14.12.20 16:33, Yun Gao wrote:
1. To include EndOfPartition into consideration for barrier alignment at
the TM side, we now tend to decouple the logic for EndOfPartition with the
normal alignment behaviors to avoid the complex interference (which seems to be
a bit not trackable). We could do so by inserting suitable barriers for input
channels received but not processed EndOfPartition. For example, if a task with
four inputs has received barrier 2 from two input channels, but the other two
inputs do not received barrier 2 before EndOfPartition due to the precedent
tasks are finished, we could then insert barrier 2 for the last two channels so
that we could still finish the checkpoint 2.
You mean we would insert "artificial" barriers for barrier 2 in case we
receive EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.
2. As we have discussed, if a tasks finished during we triggering the
tasks, it would cause checkpoint failure and we should re-trigger its
descendants. But if possible we think we might skip this issue at the first
version to reduce the implementation complexity since it should not affect the
correctness. We could considering support it in the following versions.
I think this should be completely fine.
3. We would have to add a field isFinished to OperatorState so that we
could not re-run finished sources after failover. However, this would require a
new version of checkpoint meta. Currently Flink have an abstract
MetaV2V3SerializerBase and have V2 and V3 extends it to share some
implementation. To add V4 which is only different from V3 for one field, the
current PoC want to introduce a new MetaV3V4SerializerBase extends
MetaV2V3SerializerBase to share implementation between V3 and V4. This might
looks a little complex and we might need a general mechanism to extend
checkpoint meta format.
This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?
4. With the change StreamTask would have two types of subclasses according
to how to implement triggerCheckpoint, one is source tasks that perform
checkpoints immediately and another is the non-source tasks that would notify
CheckpointBarrierHandler in some way. However, since we have multiple source
tasks (legacy and new source) and multiple non-source tasks (one-input,
two-input, multiple-input), it would cause the cases that multiple subclasses
share the same implementation and cause code repetition. Currently the PoC
introduces a new level of abstraction, namely SourceStreamTasks and
NonSourceStreamTasks, but what makes it more complicated is that
StreamingIterationHead extends OneInputStreamTask but it need to perform
checkpoint as source tasks.
Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?