Hi Guowei,
Thanks a lot for your feedback.
Your advices are really helpful.  I've updated the FLIP-245[1] to includes
these parts.
> First of all, please complete the fault-tolerant processing flow in the
FLIP.

After an execution is created and a source operator becomes ready to
receive events,  subtaskReady is called, SpeculativeSourceCoordinator would
store the mapping of SubtaskGateway to execution attempt in
SpeculativeSourceCoordinatorContext.
Then source operator registers the reader to the coordinator,
SpeculativeSourceCoordinator would store the mapping of source reader to
execution attempt in SpeculativeSourceCoordinatorContext.
If the execution goes through a failover, subtaskFailed is called,
SpeculativeSourceCoordinator would clear information about this execution,
including source readers and SubtaskGateway.
If all the current executions of the execution vertex are failed,
subtaskReset would be called, SpeculativeSourceCoordinator would clear all
information about this executions and adding splits back to the split
enumerator of source.

> Secondly the FLIP only says that user-defined events are not supported,
but it does not explain how to deal with the existing
ReportedWatermarkEvent/ReaderRegistrationEvent.

For ReaderRegistrationEvent:
When source operator registers the reader to the coordinator,
SpeculativeSourceCoordinator would also store the mapping of source reader
to execution attempt in SpeculativeSourceCoordinatorContext. Like
SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a
new source reader.
Besides, in order to distinguish source reader between different execution,
'ReaderInfo' need to add 'attemptId' field.

For ReportedWatermarkEvent:
ReportedWatermarkEvent is introduced in 1.15 which is used to support
watermark alignment in streaming mode.
Speculative execution is only enabled in batch mode. Therefore,
SpeculativeSourceCoordinator would thrown an exception if receive a
ReportedWatermarkEvent.

Besides, after offline discussion with Jiangjie (Becket) Qin, I've add
support for SourceEvent because it's useful for some user-defined sources
which have a custom event protocol between reader and enumerator.

Best,
Jing Zhang

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job

Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:

> Hi, Jing
>
> Thanks a lot for writing this FLIP, which is very useful to Batch users.
> Currently  I have only two small questions:
>
> 1. First of all, please complete the fault-tolerant processing flow in the
> FLIP. (Maybe you've already considered it, but it's better to explicitly
> give the specific solution in the FLIP.)
> For example, how to handle Source `Reader` in case of error. As far as I
> know, once the reader is unavailable, it will result in the inability to
> allocate a new split, which may be unacceptable in the case of speculative
> execution.
>
> 2. Secondly the FLIP only says that user-defined events are not supported,
> but it does not explain how to deal with the existing
> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of
> speculative execution, there may be two "same" tasks being executed at the
> same time. If these events are repeated, whether they really have no effect
> on the execution of the job, there is still a clear evaluation.
>
> Best,
> Guowei
>
>
> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Hi all,
> > One major problem of Flink batch jobs is slow tasks running on hot/bad
> > nodes, resulting in very long execution time.
> >
> > In order to solve this problem, FLIP-168: Speculative Execution for Batch
> > Job[1] is introduced and approved recently.
> >
> > Here, Zhu Zhu and I propose to support speculative execution of sources
> as
> > one of follow up of FLIP-168. You could find more details in FLIP-245[2].
> > Looking forward to your feedback.
> >
> > Best,
> > Jing Zhang
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
> >
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
>

Reply via email to