Hi all,
After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu,
I've updated the FLIP-245[1] to including:
1. Complete the fault-tolerant processing flow.
2. Support for SourceEvent because it's useful for some user-defined
sources which have a custom event protocol between reader and enumerator.
3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent messages.

Please review the FLIP-245[1] again, looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job

Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:

> Hi Guowei,
> Thanks a lot for your feedback.
> Your advices are really helpful.  I've updated the FLIP-245[1] to includes
> these parts.
> > First of all, please complete the fault-tolerant processing flow in the
> FLIP.
>
> After an execution is created and a source operator becomes ready to
> receive events,  subtaskReady is called, SpeculativeSourceCoordinator would
> store the mapping of SubtaskGateway to execution attempt in
> SpeculativeSourceCoordinatorContext.
> Then source operator registers the reader to the coordinator,
> SpeculativeSourceCoordinator would store the mapping of source reader to
> execution attempt in SpeculativeSourceCoordinatorContext.
> If the execution goes through a failover, subtaskFailed is called,
> SpeculativeSourceCoordinator would clear information about this execution,
> including source readers and SubtaskGateway.
> If all the current executions of the execution vertex are failed,
> subtaskReset would be called, SpeculativeSourceCoordinator would clear all
> information about this executions and adding splits back to the split
> enumerator of source.
>
> > Secondly the FLIP only says that user-defined events are not supported,
> but it does not explain how to deal with the existing
> ReportedWatermarkEvent/ReaderRegistrationEvent.
>
> For ReaderRegistrationEvent:
> When source operator registers the reader to the coordinator,
> SpeculativeSourceCoordinator would also store the mapping of source reader
> to execution attempt in SpeculativeSourceCoordinatorContext. Like
> SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a
> new source reader.
> Besides, in order to distinguish source reader between different
> execution, 'ReaderInfo' need to add 'attemptId' field.
>
> For ReportedWatermarkEvent:
> ReportedWatermarkEvent is introduced in 1.15 which is used to support
> watermark alignment in streaming mode.
> Speculative execution is only enabled in batch mode. Therefore,
> SpeculativeSourceCoordinator would thrown an exception if receive a
> ReportedWatermarkEvent.
>
> Besides, after offline discussion with Jiangjie (Becket) Qin, I've add
> support for SourceEvent because it's useful for some user-defined sources
> which have a custom event protocol between reader and enumerator.
>
> Best,
> Jing Zhang
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>
> Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
>
>> Hi, Jing
>>
>> Thanks a lot for writing this FLIP, which is very useful to Batch users.
>> Currently  I have only two small questions:
>>
>> 1. First of all, please complete the fault-tolerant processing flow in the
>> FLIP. (Maybe you've already considered it, but it's better to explicitly
>> give the specific solution in the FLIP.)
>> For example, how to handle Source `Reader` in case of error. As far as I
>> know, once the reader is unavailable, it will result in the inability to
>> allocate a new split, which may be unacceptable in the case of speculative
>> execution.
>>
>> 2. Secondly the FLIP only says that user-defined events are not supported,
>> but it does not explain how to deal with the existing
>> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of
>> speculative execution, there may be two "same" tasks being executed at the
>> same time. If these events are repeated, whether they really have no
>> effect
>> on the execution of the job, there is still a clear evaluation.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote:
>>
>> > Hi all,
>> > One major problem of Flink batch jobs is slow tasks running on hot/bad
>> > nodes, resulting in very long execution time.
>> >
>> > In order to solve this problem, FLIP-168: Speculative Execution for
>> Batch
>> > Job[1] is introduced and approved recently.
>> >
>> > Here, Zhu Zhu and I propose to support speculative execution of sources
>> as
>> > one of follow up of FLIP-168. You could find more details in
>> FLIP-245[2].
>> > Looking forward to your feedback.
>> >
>> > Best,
>> > Jing Zhang
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
>> >
>> > [2]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>> >
>>
>

Reply via email to