[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-12-26 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465314#comment-17465314
 ] 

wangwj commented on FLINK-10644:


[~Jiangang]
We are now redesigning the interface, so we need to wait.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-06-12 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17362264#comment-17362264
 ] 

wangwj commented on FLINK-10644:


[~trohrmann]
Hi Till.
I have finished FLIP.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job
Could you please help me review it first? Or, I directly sent an e-mail to 
d...@flink.apache.org for discussion? 
Looking forward to your reply.
Thanks~

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2021-05-31 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354777#comment-17354777
 ] 

wangwj commented on FLINK-10205:


[~Ryantaocer][~isunjin]
Hi, excuse me.
After I read this issue detailed, I have a question that in batch job although 
each inputsplit will be processed exactly once, but  when a task failover, it 
maybe not process the same inputsplit before failover after this patch merged.
Does this problem still exist?
I am working in speculative execution, so I want to discuess with you.
Thanks~


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: ryantaocer
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>   Original Estimate: 168h
>  Time Spent: 0.5h
>  Remaining Estimate: 167.5h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-05-19 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347457#comment-17347457
 ] 

wangwj commented on FLINK-10644:


[~trohrmann]
I already have permissions to create a FLIP in the wiki.
Thanks~
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Help-Who-can-add-permission-in-FLIP-td50755.html


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj edited comment on FLINK-10644 at 5/19/21, 4:20 AM:
--

[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample POC and test 
success in standalone cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~



was (Author: wangwj):
[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample POC and test 
success in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj edited comment on FLINK-10644 at 5/19/21, 4:18 AM:
--

[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample POC and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~



was (Author: wangwj):
[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj edited comment on FLINK-10644 at 5/19/21, 4:18 AM:
--

[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~



was (Author: wangwj):
[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org ?
Thanks~


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj edited comment on FLINK-10644 at 5/19/21, 4:18 AM:
--

[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample POC and test 
success in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~



was (Author: wangwj):
[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample POC and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org?
Thanks~


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj edited comment on FLINK-10644 at 5/19/21, 4:17 AM:
--

[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki for our discussion in 
d...@flink.apache.org ?
Thanks~



was (Author: wangwj):
[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki ?

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-05-18 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347291#comment-17347291
 ] 

wangwj commented on FLINK-10644:


[~trohrmann]
Hi till.
After I read code of Flink(master branch) I write a very sample Poc and testing 
in my local cluster.
Now can I write a FLIP in the Flink wiki ?

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-05-01 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332438#comment-17332438
 ] 

wangwj edited comment on FLINK-10644 at 5/1/21, 2:45 PM:
-

[~trohrmann]
Hi,I have implemented this feature, and it has a very significant effect in our 
product cluster.
We are colleagues of Alibaba, I will talk with you in detail on DingTalk.


was (Author: wangwj):
[~trohrmann]
Hi,I have implemented this feature, and it has a very significant effect in our 
product cluster.
We are Alibaba's colleagues, I will talk with you in detail on DingTalk.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-30 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337504#comment-17337504
 ] 

wangwj edited comment on FLINK-10644 at 4/30/21, 4:53 PM:
--

Hi, [~trohrmann] 
[~wind_ljy]
I think the closest version of Flink to my Blink version I built this feature 
on maybe is 1.7 or 1.8
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then try to write a POC.
Then I will send e-mail to d...@flink.apache.org to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks




was (Author: wangwj):
Hi, [~trohrmann] 
[~wind_ljy]
I think the closest version of Flink to my Blink version I built this feature 
on maybe is 1.7 or 1.8
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then write a POC.
Then I will send e-mail to d...@flink.apache.org to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks



> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks 

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-30 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337504#comment-17337504
 ] 

wangwj edited comment on FLINK-10644 at 4/30/21, 4:52 PM:
--

Hi, [~trohrmann] 
[~wind_ljy]
I think the closest version of Flink to my Blink version I built this feature 
on maybe is 1.7 or 1.8
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then write a POC.
Then I will send e-mail to d...@flink.apache.org to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks




was (Author: wangwj):
Hi, [~trohrmann] 
[~wind_ljy]
The closest version of Flink to my Blink version I built this feature on is 
1.5.1
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then write a POC.
Then I will send e-mail to d...@flink.apache.org to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks



> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most 

[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-04-30 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337504#comment-17337504
 ] 

wangwj commented on FLINK-10644:


Hi, [~trohrmann] 
[~wind_ljy]
The closest version of Flink to my Blink version I built this feature on is 
1.5.1
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then write a POC.
Then I will send e-mail to d...@flink.apache.org to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks



> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-30 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj edited comment on FLINK-10644 at 4/30/21, 4:15 PM:
--

[~trohrmann]
Hi Till.

I am from the search and recommendation department of Alibaba in China. Happy 
to share and discuss my job here.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day. Many long-tail tasks are produced everyday and we have to kill 
these processes manually, which leads to a poor user experience. So I tried to 
solve this problem.

I think that speculative execution means that two executions in a 
ExecutionVertex running at a same time, and failover means that two executions 
running at two different time. Based on this, I think this feature(speculative 
execution) is theoretically achievable. So, I have implemented a speculative 
execution for batch job based on Blink, and it had a significant effect in our 
product cluster. 

I did as follows:

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
The speculative execution feature correlates with the implementation details of 
the region failover. So, I found that a ExecutionJobVertex will enable 
speculative execution feature only if all input edges and output edges of this 
ExecutionJobVertex are blocking(Condition A).

(2)How to distinguish long-tail task?
I distinguish long-tail task based on the intervals between the current time 
and the execution first create/deploying time before it failover. When an 
ExecutionJobVertex meets Condition A and a configurable percentage of 
executions has been finished in the ExecutionJobVertex, the speculative 
execution thread starts to really work.  In the ExecutionJobVertex, when the 
running time of a execution is greater than a configurable multiple of the 
median of the running time of other finished executions, this execution is 
defined as long-tail execution.(Condition B)

(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in Condition B, I solved the 
problem of long-tail tasks in our product cluster.
In the next step, we may add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManagers with JobManager.

(4)How to schedule another execution in a same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution.

(5)How to make the speculative task run on another machine from the original 
execution.
We have implemented a machine-dimensional blacklist per job. The machine IP was 
added in the blacklist when an execution is recognized as a long-tail 
execution.  The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to 
yarn PlacementConstraint. In this way, I can ensure that the yarn container is 
not on the machines in the blacklist.

(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, multi-thread synchronization was 
used to ensure that only one execution would successfully finished in an 
ExecutionVertex. All the other executions will go to the cancellation logic.

(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.

Finally, I will delete or rename these files in finalizeOnMaster().
 
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.

(8)In batch job with all-to-all shuffle, how did the downstream original 
execution and speculative execution select the ResultSubPartition of the 
upstream executions?

Two executions of a upstream ExecutionVertex will produce two ResultPartitions. 
When the upstream ExecutionVertex finished, we will update the inputChannel of 
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation when the down stream execution 
meet DataConsumptionException. It will restart with the upstream execution that 
has been finished.

(9)How to display information about speculative task on the Flink web ui?
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web, I will ask my colleague for help.

[~trohrmann]
My implementation has played a big role in our product cluster in 

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-30 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj edited comment on FLINK-10644 at 4/30/21, 2:40 PM:
--

[~trohrmann]
Hi Till.

I am from the search and recommendation department of Alibaba in China. Happy 
to share and discuss my job here.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day. Many long-tail tasks are produced everyday and we have to kill 
these processes manually, which leads to a poor user experience. So I tried to 
solve this problem.

I think that speculative execution means that two executions in a 
ExecutionVertex running at a same time, and failover means that two tasks 
running at two different time. Based on this, I think this feature(speculative 
execution) is theoretically achievable. So, I have implemented a speculative 
execution for batch job based on Blink, and it had a significant effect in our 
product cluster. 

I did as follows:

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
The speculative execution feature correlates with the implementation details of 
the region failover. So, I found that a ExecutionJobVertex will enable 
speculative execution feature only if all input edges and output edges of this 
ExecutionJobVertex are blocking(Condition A).

(2)How to distinguish long-tail task?
I distinguish long-tail task based on the intervals between the current time 
and the execution first create/deploying time before it failover. When an 
ExecutionJobVertex meets Condition A and a configurable percentage of 
executions has been finished in the ExecutionJobVertex, the speculative 
execution thread starts to really work.  In the ExecutionJobVertex, when the 
running time of a execution is greater than a configurable multiple of the 
median of the running time of other finished executions, this execution is 
defined as long-tail execution.(Condition B)

(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in Condition B, I solved the 
problem of long-tail tasks in our product cluster.
In the next step, we may add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManagers with JobManager.

(4)How to schedule another execution in a same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution.

(5)How to make the speculative task run on another machine from the original 
execution.
We have implemented a machine-dimensional blacklist per job. The machine IP was 
added in the blacklist when an execution is recognized as a long-tail 
execution.  The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to 
yarn PlacementConstraint. In this way, I can ensure that the yarn container is 
not on the machines in the blacklist.

(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, multi-thread synchronization was 
used to ensure that only one execution would successfully finished in an 
ExecutionVertex. All the other executions will go to the cancellation logic.

(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.

Finally, I will delete or rename these files in finalizeOnMaster().
 
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.

(8)In batch job with all-to-all shuffle, how did the downstream original 
execution and speculative execution select the ResultSubPartition of the 
upstream executions?

Two executions of a upstream ExecutionVertex will produce two ResultPartitions. 
When the upstream ExecutionVertex finished, we will update the inputChannel of 
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation when the down stream execution 
meet DataConsumptionException. It will restart with the upstream execution that 
has been finished.

(9)How to display information about speculative task on the Flink web ui?
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web, I will ask my colleague for help.

[~trohrmann]
My implementation has played a big role in our product cluster in Alibaba.

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-29 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj edited comment on FLINK-10644 at 4/29/21, 3:42 PM:
--

[~trohrmann]
Hi Till.

I am from the search and recommendation department of Alibaba in China. Happy 
to share and discuss my job here.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day. Many long-tail tasks are produced everyday and we have to kill 
these processes manually, which leads to a poor user experience. So I tried to 
solve this problem.

I think that speculative execution means that two executions in a 
ExecutionVertex running at a same time, and failover means that two tasks 
running at two different time. Based on this, I think this feature(speculative 
execution) is theoretically achievable. So, I have implemented a speculative 
execution for batch job based on Blink, and it had a significant effect in our 
product cluster. 

I did as follows:

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
The speculative execution feature correlates with the implementation details of 
the region failover. So, I found that a ExecutionJobVertex will enable 
speculative execution feature only if all input edges and output edges of this 
ExecutionJobVertex are blocking(Condition A).

(2)How to distinguish long-tail task?
I distinguish long-tail task based on the intervals between the current time 
and the execution first create/deploying time before it failover. When an 
ExecutionJobVertex meets Condition A and a configurable percentage of 
executions has been finished in the ExecutionJobVertex, the speculative 
execution thread starts to really work.  In the ExecutionJobVertex, when the 
running time of a execution is greater than a configurable multiple of the 
median of the running time of other finished executions, this execution is 
defined as long-tail execution.

(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in (2), In our product cluster, I 
can completely solve the long tail problem.
In the next step, we maybe add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManager with JobManager.

(4)How schedule another execution in an same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution execution.

(5)How to make the speculative task runs on a different machine from the 
original task.
We have implemented a machine-dimensional blacklist,and add the machine ip in 
the blacklist when a execution is a long tail execution base on speculative 
execution algorithm in (2). The blacklist has the ability of timed out. 
When schedule executions we will add blacklist information to yarn 
PlacementConstraint.
In this way, I can ensure that the yarn container is not on the machines in the 
blacklist.

(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, I have done multi-thread 
synchronization, to ensure that an ExecutionVertex will eventually have only 
one execution successfully finished, and other executions will all go to the 
cancellation logic.

(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.
Finally in finalizeOnMaster() I will delete or rename files.
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.

(8)In batch job with all-to-all shuffle, how do we let the downstream original 
execution and speculative execution know which ResultSubPartition to read of 
upstream task?
Two executions of a upstream ExecutionVertex will produce two ResultPartition. 
When upstream ExecutionVertex have finished we will update the input channel of 
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation that the down stream execution 
when meet DataConsumptionException. It will restarts with the upstream 
execution that has been finished.

(9)How to display information about speculative task on the Flink web ui.
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web, I will ask my colleague for help.

[~trohrmann]
My implementation has played a big role in our product cluster in Alibaba.
Happy 

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-27 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj edited comment on FLINK-10644 at 4/28/21, 3:31 AM:
--

[~trohrmann]
Hi Till.

I am come from the search and recommendation department of Alibaba in China.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day.
There are many long-tail tasks every day, we can only manually go to the 
machine to kill the process and seriously affect the experience of users. So I 
made up my mind to solve this problem.

First of all, I think that speculative execution means that two executions in a 
ExecutionVertex run at the same time. While failover means that two tasks run 
at different times. Based on this theory, I think this feature(speculative 
execution) is definitely achievable. Finally, the facts proved that my idea was 
right.

So I have implemented a speculative execution for batch job based on Blink, and 
it has a very significant effect in our product cluster. My approach is as 
follows, happy to discuss them.

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
Because of the speculative execution feature involves the implementation 
details of the region failover. After research, I have decided a 
ExecutionJobVertex will enable speculative execution feature only if all input 
edges and output edges of this ExecutionJobVertex are blocking.


(2)How to distinguish long tail task?
I distinguish long tail task based on current time and the execution first 
create/deploying time before it failover.
For ExecutionJobVertex that meets the condition (1)
When a configurable percentage of executions have been finished in an 
ExecutionJobVertex, the speculative execution thread starts to really work.
In an ExecutionJobVertex, when the running time of a execution is a 
configurable multiple of the median running time of other finished executions, 
this execution is judged as long tail execution.


(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in (2), In our product cluster, I 
can completely solve the long tail problem.
In the next step, we maybe add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManager with JobManager.


(4)How schedule another execution in an same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution execution.


(5)How to make the speculative task runs on a different machine from the 
original task.
We have implemented a machine-dimensional blacklist,and add the machine ip in 
the blacklist when a execution is a long tail execution base on speculative 
execution algorithm in (2). The blacklist has the ability of timed out. 
When schedule executions we will add blacklist information to yarn 
PlacementConstraint.
In this way, I can ensure that the yarn container is not on the machines in the 
blacklist.

(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, I have done multi-thread 
synchronization, to ensure that an ExecutionVertex will eventually have only 
one execution successfully finished, and other executions will all go to the 
cancellation logic.


(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.
Finally in finalizeOnMaster() I will delete or rename files.
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.


(8)In batch job with all-to-all shuffle, how do we let the downstream original 
execution and speculative execution know which ResultSubPartition to read of 
upstream task?
Two executions of a upstream ExecutionVertex will produce two ResultPartition. 
When upstream ExecutionVertex have finished we will update the input channel of 
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation that the down stream execution 
when meet DataConsumptionException. It will restarts with the upstream 
execution that has been finished.


(9)How to display information about speculative task on the Flink web ui.
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web, I will ask my colleague for help.

[~trohrmann]
My implementation has 

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-27 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj edited comment on FLINK-10644 at 4/28/21, 3:21 AM:
--

[~trohrmann]
Hi Till.

I am come from the search and recommendation department of Alibaba in China.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day.
There are many long-tail tasks every day, we can only manually go to the 
machine to kill the process and seriously affect the experience of users. So I 
made up my mind to solve this problem.

First of all, I think that speculative execution means that two executions in a 
ExecutionVertex run at the same time. While failover means that two tasks run 
at different times. Based on this theory, I think this feature(speculative 
execution) is definitely achievable. Finally, the facts proved that my idea was 
right.

So I have implemented a speculative execution for batch job based on Blink, and 
it has a very significant effect in our product cluster. My approach is as 
follows, happy to discuss them.

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
Because of the speculative execution feature involves the implementation 
details of the region failover. After research, I have decided a 
ExecutionJobVertex will enable speculative execution feature only if all input 
edges and output edges of this ExecutionJobVertex are blocking.


(2)How to distinguish long tail task?
I distinguish long tail task based on current time and the execution first 
create/deploying time before it failover.
For ExecutionJobVertex that meets the condition (1)
When a configurable percentage of executions have been finished in an 
ExecutionJobVertex, the speculative execution thread starts to really work.
In an ExecutionJobVertex, when the running time of a execution is a 
configurable multiple of the median running time of other finished executions, 
this execution is judged as long tail execution.


(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in (2), In our product cluster, I 
can completely solve the long tail problem.
In the next step, we maybe add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManager with JobManager.


(4)How schedule another execution in an same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution execution.


(5)How to make the speculative task runs on a different machine from the 
original task.
We have implemented a machine-dimensional blacklist,and add the machine ip in 
the blacklist when a execution is a long tail execution base on speculative 
execution algorithm in (2). The blacklist has the ability of timed out. 
When schedule executions we will add blacklist information to yarn 
PlacementConstraint.
In this way, I can ensure that the yarn container is not on the machines in the 
blacklist.

(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, I have done multi-thread 
synchronization, to ensure that an ExecutionVertex will eventually have only 
one execution successfully finished, and other executions will all go to the 
cancellation logic.


(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.
Finally in finalizeOnMaster() I will delete or rename files.
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.


(8)In batch job with all-to-all shuffle, how do we let the downstream original 
execution and speculative execution know which ResultSubPartition to read of 
upstream task?
Two executions of a upstream ExecutionVertex will produce two ResultPartition. 
When upstream ExecutionVertex have finished we will update the input channel of 
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation that the down stream execution 
when meet DataConsumptionException. It will restarts with the upstream 
execution that has been finished.


(9)How to display information about speculative task on the Flink web ui.
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web,I will ask my colleague for help.

[~trohrmann]
I am very interested in 

[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-04-27 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334427#comment-17334427
 ] 

wangwj commented on FLINK-10644:


[~trohrmann]
Hi Till.

I am come from the search and recommendation department of Alibaba in China.
Our big data processing platform uses Flink Batch to process extremely huge 
data every day.
There are many long-tail tasks every day, we can only manually go to the 
machine to kill the process and seriously affect the experience of the our 
users. So I made up my mind to solve this problem.

First of all, I think that speculative execution means that two executions in a 
ExecutionVertex run at the same time. While failover means that two tasks run 
at different times. Based on this theory, I think this feature(speculative 
execution) is definitely achievable. Finally, the facts proved that my idea was 
right

So I have implemented a speculative execution for batch job based on Blink, and 
it has a very significant effect in our product cluster. My approach is as 
follows, happy to discuss them.

(1)Which kind of ExecutionJobVertex is suitable enable speculative execution 
feature in a batch job?
Because of the speculative execution feature involves the implementation 
details of the region failover. After research, I have decided a 
ExecutionJobVertex will enable speculative execution feature only if all input 
edges and output edges of this ExecutionJobVertex are blocking.


(2)How to distinguish long tail task?
I distinguish long tail task based on current time and the execution first 
create/deploying time before it failover.
For ExecutionJobVertex that meets the condition (1)
When a configurable percentage of executions have been finished in an 
ExecutionJobVertex, the speculative execution thread starts to really work.
In an ExecutionJobVertex, when the running time of a execution is a 
configurable multiple of the median running time of other finished executions, 
this execution is judged as long tail execution.


(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in (2), In our product cluster, I 
can completely solve the long tail problem.
In the next step, we will add the throughput of the task to the speculative 
execution algorithm through the heartbeat of TaskManager with JobManager.


(4)How schedule another Execution in an same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means 
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative 
execution execution.


(5)How to make the speculative task runs on a different machine from the 
original task.
We have implemented a machine-dimensional blacklist,and add the machine ip in 
the blacklist when a execution is a long tail execution base on speculative 
execution algorithm in (2). The blacklist has the ability of timed out.


(6)How to avoid errors when multiple executions finish at the same time in an 
ExecutionVertex?
In ExecutionVertex executionFinished() method, I have done multi-thread 
synchronization, to ensure that an ExecutionVertex will eventually have only 
one execution successfully finished, and other executions will all go to the 
cancellation logic.


(7)How to deal with multiple sink files in one ExecutionVertex when the job is 
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to 
the file name.
Finally in finalizeOnMaster() I will delete or rename them.
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.


(8)In batch job with all-to-all shuffle, how do we let the downstream original 
execution and speculative execution know which ResultSubPartition to read of 
upstream task?
Two executions of a upstream ExecutionVertex will produce two 
ResultPartition。When edge is blocking between upstream  and down stream. When 
upstream ExecutionVertex have finished we will update the input channel of down 
stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation that the down stream execution 
when meet DataConsumptionException. It will restarts with the upstream 
execution that has been finished.
Here we should pay attention to the situation of flink stream job processing 
bounded data sets.


(9)How to display information about speculative task on the Flink web ui.
After I have implemented this feature. When speculative execution runs faster 
then original execution, the flink ui will show that this task has been 
cancelled. But the result of the job is correct, which is in full compliance 
with our expectations.
I don’t know much about the web,I will ask my colleague for help.

[~trohrmann]
I am very interested in this issue, and my implementation has played a big role 
in our product 

[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2021-04-26 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332438#comment-17332438
 ] 

wangwj edited comment on FLINK-10644 at 4/26/21, 2:24 PM:
--

[~trohrmann]
Hi,I have implemented this feature, and it has a very significant effect in our 
product cluster.
We are Alibaba's colleagues, I will talk with you in detail on DingTalk.


was (Author: wangwj):
[~trohrmann]
Hi,I implemented this feature, and it has a very significant effect in our 
cluster.
We are Alibaba's colleagues, I will talk with you in detail on DingTalk.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2021-04-26 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332438#comment-17332438
 ] 

wangwj commented on FLINK-10644:


[~trohrmann]
Hi,I implemented this feature, and it has a very significant effect in our 
cluster.
We are Alibaba's colleagues, I will talk with you in detail on DingTalk.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>  Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2020-12-07 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245062#comment-17245062
 ] 

wangwj commented on FLINK-10644:


[~QiLuo]
I will look into this work.
How is your progress of this work?


> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)