[jira] [Issue Comment Deleted] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Comment: was deleted

(was: This issue was marked "stale-assigned" and has not received an update in 
7 days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Comment: was deleted

(was: This issue was labeled "stale-major" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.
)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Comment: was deleted

(was: This issue is assigned but has not received an update in 7 days so it has 
been labeled "stale-assigned". If you are still working on the issue, please 
give an update and remove the label. If you are no longer working on the issue, 
please unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.
)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Priority: Major  (was: Minor)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18295) Remove the hack logics of result consumers

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-18295:

Labels:   (was: auto-deprioritized-major auto-unassigned)

> Remove the hack logics of result consumers
> --
>
> Key: FLINK-18295
> URL: https://issues.apache.org/jira/browse/FLINK-18295
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
> consumers. That's why the consumers of a `IntermediateResultPartition` is in 
> the form of {{List>}}.
> However, in scheduler/{{ExecutionGraph}} there is assumption that one 
> `IntermediateResultPartition` can be consumed by one only 
> `ExecutionJobVertex`. This results in a lot of hack logics which assumes 
> partition consumers to contain a single list.
> We should remove these hack logics. The idea is to change 
> `IntermediateResultPartition#consumers` to be `List`. 
> `ExecutionGraph` building logics should be adjusted accordingly with the 
> assumption that an `IntermediateResult` can have one only consumer vertex. In 
> `JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23879) Benchmark are not compiling

2021-08-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401986#comment-17401986
 ] 

Zhu Zhu commented on FLINK-23879:
-

cc [~arvid] [~dwysakowicz]

> Benchmark are not compiling
> ---
>
> Key: FLINK-23879
> URL: https://issues.apache.org/jira/browse/FLINK-23879
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Blocker
> Fix For: 1.14.0
>
>
> The benchmark is not compiling from Aug. 16th, 2021. The error is:
> {noformat}
> [2021-08-19T23:18:36.242Z] [ERROR] 
> /home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:47:54:
>  error: package org.apache.flink.streaming.runtime.streamstatus does not exist
> [2021-08-19T23:18:36.242Z] [ERROR] 
> /home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:350:40:
>  error: cannot find symbol{noformat}
> It seems to be introduced by FLINK-23767, in which {{StreamStatus}} is 
> replaced with {{WatermarkStatus}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time

2021-08-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23806.
---
Resolution: Fixed

Fixed via
master/release-1.14: f543e9a97e2d2dda340d4d1d54467ffe060666cb
release-1.13: de16f34193799e7f3aade15b9bc57549f8010621
release-1.12: 5e83f3e6f3d9bef893a28e68b6ed2534589f1e30

> StackOverflowException can happen if a large scale job failed to acquire 
> enough slots in time
> -
>
> Key: FLINK-23806
> URL: https://issues.apache.org/jira/browse/FLINK-23806
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.12.5, 1.13.2
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3
>
>
> When requested slots are not fulfilled in time, task failure will be 
> triggered and all related tasks will be canceled and restarted. However, in 
> this process, if a task is already assigned a slot, the slot will be returned 
> to the slot pool and it will be immediately used to fulfill pending slot 
> requests of the tasks which will soon be canceled. The execution version of 
> those tasks are already bumped in 
> {{DefaultScheduler#restartTasksWithDelay(...)}} so that the assignment will 
> fail immediately and the slot will be returned to the slot pool and again 
> used to fulfill pending slot requests. StackOverflow can happen in this way 
> when there are many vertices, and fatal error can happen and lead to JM 
> crash. A sample call stack is attached below.
> To fix the problem, one way is to cancel the pending requests of all the 
> tasks which will be canceled soon(i.e. tasks with version bumped) before 
> canceling these tasks.
> {panel}
> ...
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> 

[jira] [Assigned] (FLINK-23833) Cache of ShuffleDescriptors should be individually cleaned up

2021-08-17 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23833:
---

Assignee: Zhilong Hong

> Cache of ShuffleDescriptors should be individually cleaned up
> -
>
> Key: FLINK-23833
> URL: https://issues.apache.org/jira/browse/FLINK-23833
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
> Fix For: 1.14.0
>
>
> {color:#172b4d}In FLINK-23005, we introduce the cache of compressed 
> serialized value for ShuffleDescriptors to improve the performance of 
> deployment. To make sure the cache wouldn't stay too long and become a burden 
> for GC, the cache would be cleaned up when the partition is released or reset 
> for new execution. In the implementation, the cache of the entire 
> IntermediateResult is cleaned up because a partition is released only when 
> the entire IntermediateResult is released. {color}
> {color:#172b4d}However, after FLINK-22017, the BLOCKING result partition is 
> allowed to be consumable individually. It also means that the result 
> partition doesn't need to wait for other result partitions and can be 
> released individually. After this change, there may be a scene: when a result 
> partition is finished, the cache of IntermediateResult on the blob is 
> deleted, while other result partitions corresponding to this 
> IntermediateResult is just deployed to the TaskExecutor. Then when 
> TaskExecutors are trying to download TDD from the blob, they will find the 
> blob is deleted and get stuck.{color}
> {color:#172b4d}This bug only happens for jobs with POINTWISE BLOCKING edge. 
> Also, the {{blob.offload.minsize}} is set to be a extremely small value, 
> since the size of  ShuffleDescriptors of POINTWISE BLOCKING edges is usually 
> small. To solve this issue, we just need to clean up the cache of 
> ShuffleDescriptors individually.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23826) Verify optimized scheduler performance for large-scale jobs

2021-08-16 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23826:

Description: 
This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include, for a *1 --- all-to-all-connected -->1* 
job:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required



  was:
This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include, for a *1 ---all-to-all-connected-->1* 
job:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required




> Verify optimized scheduler performance for large-scale jobs
> ---
>
> Key: FLINK-23826
> URL: https://issues.apache.org/jira/browse/FLINK-23826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.14.0
>
>
> This ticket is used to verify the result of FLINK-21110.
> It should check if large scale jobs' scheduling are working well and the 
> scheduling performance, with a real job running on cluster. 
> The conclusion should include, for a *1 --- all-to-all-connected 
> -->1* job:
> 1. time of job initialization on master (job received -> scheduling started)
> 2. time of job scheduling and deployment (scheduling started -> all tasks in 
> INITIALIZATION)
> 3. time of job restarting on task failover (JM notified about task failure -> 
> all tasks in INITIALIZATION again)
> 4. master heap memory required



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23826) Verify optimized scheduler performance for large-scale jobs

2021-08-16 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23826:

Description: 
This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include, for a *1 ---all-to-all-connected-->1* 
job:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required



  was:
This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required for large scale jobs




> Verify optimized scheduler performance for large-scale jobs
> ---
>
> Key: FLINK-23826
> URL: https://issues.apache.org/jira/browse/FLINK-23826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.14.0
>
>
> This ticket is used to verify the result of FLINK-21110.
> It should check if large scale jobs' scheduling are working well and the 
> scheduling performance, with a real job running on cluster. 
> The conclusion should include, for a *1 ---all-to-all-connected-->1* 
> job:
> 1. time of job initialization on master (job received -> scheduling started)
> 2. time of job scheduling and deployment (scheduling started -> all tasks in 
> INITIALIZATION)
> 3. time of job restarting on task failover (JM notified about task failure -> 
> all tasks in INITIALIZATION again)
> 4. master heap memory required



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23826) Verify optimized scheduler performance for large-scale jobs

2021-08-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-23826:
---

 Summary: Verify optimized scheduler performance for large-scale 
jobs
 Key: FLINK-23826
 URL: https://issues.apache.org/jira/browse/FLINK-23826
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhu Zhu
 Fix For: 1.14.0


This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required for large scale jobs





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time

2021-08-16 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23806:

Description: 
When requested slots are not fulfilled in time, task failure will be triggered 
and all related tasks will be canceled and restarted. However, in this process, 
if a task is already assigned a slot, the slot will be returned to the slot 
pool and it will be immediately used to fulfill pending slot requests of the 
tasks which will soon be canceled. The execution version of those tasks are 
already bumped in {{DefaultScheduler#restartTasksWithDelay(...)}} so that the 
assignment will fail immediately and the slot will be returned to the slot pool 
and again used to fulfill pending slot requests. StackOverflow can happen in 
this way when there are many vertices, and fatal error can happen and lead to 
JM crash. A sample call stack is attached below.

To fix the problem, one way is to cancel the pending requests of all the tasks 
which will be canceled soon(i.e. tasks with version bumped) before canceling 
these tasks.

{panel}
...
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 

[jira] [Assigned] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time

2021-08-16 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23806:
---

Assignee: Zhu Zhu

> StackOverflowException can happen if a large scale job failed to acquire 
> enough slots in time
> -
>
> Key: FLINK-23806
> URL: https://issues.apache.org/jira/browse/FLINK-23806
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.12.5, 1.13.2
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Critical
> Fix For: 1.14.0, 1.12.6, 1.13.3
>
>
> When requested slots are not fulfilled in time, task failure will be 
> triggered and all related tasks will be canceled and restarted. However, in 
> this process, if a task is already assigned a slot, the slot will be returned 
> to the slot pool and it will be immediately used to fulfill pending slot 
> requests of the tasks which will soon be canceled. The execution version of 
> those tasks are already bumped in 
> {{DefaultScheduler#restartTasksWithDelay(...)}} so that the assignment will 
> fail immediately and the slot will be returned to the slot pool and again 
> used to fulfill pending slot requests. StackOverflow can happen in this way 
> when there are many vertices, and fatal error can happen and lead to JM will 
> crash. A sample call stack is attached below.
> To fix the problem, one way is to cancel the pending requests of all the 
> tasks which will be canceled soon(i.e. tasks with version bumped) before 
> canceling these tasks.
> {panel}
> ...
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
>   

[jira] [Created] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time

2021-08-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-23806:
---

 Summary: StackOverflowException can happen if a large scale job 
failed to acquire enough slots in time
 Key: FLINK-23806
 URL: https://issues.apache.org/jira/browse/FLINK-23806
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2, 1.12.5
Reporter: Zhu Zhu
 Fix For: 1.14.0, 1.12.6, 1.13.3


When requested slots are not fulfilled in time, task failure will be triggered 
and all related tasks will be canceled and restarted. However, in this process, 
if a task is already assigned a slot, the slot will be returned to the slot 
pool and it will be immediately used to fulfill pending slot requests of the 
tasks which will soon be canceled. The execution version of those tasks are 
already bumped in {{DefaultScheduler#restartTasksWithDelay(...)}} so that the 
assignment will fail immediately and the slot will be returned to the slot pool 
and again used to fulfill pending slot requests. StackOverflow can happen in 
this way when there are many vertices, and fatal error can happen and lead to 
JM will crash. A sample call stack is attached below.

To fix the problem, one way is to cancel the pending requests of all the tasks 
which will be canceled soon(i.e. tasks with version bumped) before canceling 
these tasks.

{panel}
...
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 

[jira] [Closed] (FLINK-21110) Optimize scheduler performance for large-scale jobs

2021-08-16 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-21110.
---
Resolution: Done

> Optimize scheduler performance for large-scale jobs
> ---
>
> Key: FLINK-21110
> URL: https://issues.apache.org/jira/browse/FLINK-21110
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration of Group.jpg
>
>
> According to the result of scheduler benchmarks we implemented in 
> FLINK-20612, the bottleneck of deploying and running a large-scale job in 
> Flink is mainly focused on the following procedures:
> |Procedure|Time complexity|
> |Initializing ExecutionGraph|O(N^2)|
> |Building DefaultExecutionTopology|O(N^2)|
> |Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
> |Scheduling downstream tasks when a task finishes|O(N^2)|
> |Calculating tasks to restart when a failover occurs|O(N^2)|
> |Releasing result partitions|O(N^3)|
> These procedures are all related to the complexity of the topology in the 
> ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
> the upstream Intermediate ResultPartitions are connected to all downstream 
> ExecutionVertices. The computation complexity of building and traversing all 
> these edges will be O(N^2). 
> As for memory usage, currently we use ExecutionEdges to store the information 
> of connections. For the all-to-all distribution type, there are O(N^2) 
> ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
> of them are both 10k. Furthermore, they are connected with all-to-all edges. 
> It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.
> In most large-scale jobs, there will be more than two vertices with large 
> parallelisms, and they would cost a lot of time and memory to deploy the job.
> As we can see, for two JobVertices connected with the all-to-all distribution 
> type, all IntermediateResultPartitions produced by the upstream 
> ExecutionVertices are isomorphic, which means that the downstream 
> ExecutionVertices they connected are exactly the same. The downstream 
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
> upstream ResultPartitions they connect are the same, too.
> Since every JobEdge has exactly one distribution type, we can divide the 
> vertices and result partitions into groups according to the distribution type 
> of the JobEdge. 
> For the all-to-all distribution type, since all downstream vertices are 
> isomorphic, they belong to a single group, and all the upstream result 
> partitions are connected to this group. Vice versa, all the upstream result 
> partitions also belong to a single group, and all the downstream vertices are 
> connected to this group. In the past, when we wanted to iterate all the 
> downstream vertices, we needed to loop over them n times, which leads to the 
> complexity of O(N^2). Now since all upstream result partitions are connected 
> to one downstream group, we just need to loop over them once, with the 
> complexity of O(N).
> For the pointwise distribution type, because each result partition is 
> connected to different downstream vertices, they should belong to different 
> groups. Vice versa, all the vertices belong to different groups. Since one 
> result partition group is connected to one vertex group pointwisely, the 
> computation complexity of looping over them is still O(N).
> !Illustration of Group.jpg|height=249!
> After we group the result partitions and vertices, ExecutionEdge is no longer 
> needed. For the test job we mentioned above, the optimization can effectively 
> reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) 
> in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds 
> (with 10k parallelism).
>  
> The detailed design doc: 
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22767) Optimize the initialization of LocalInputPreferredSlotSharingStrategy

2021-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22767.
---
Resolution: Done

Done via 94ae3dc0cb0a0a374b8f15fe49b09cc00ccf4c19

> Optimize the initialization of LocalInputPreferredSlotSharingStrategy
> -
>
> Key: FLINK-22767
> URL: https://issues.apache.org/jira/browse/FLINK-22767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Based on the scheduler benchmark introduced in FLINK-21731, we find that 
> during the initialization of {{LocalInputPreferredSlotSharingStrategy}}, 
> there's a procedure that has O(N^2) complexity: 
> {{ExecutionSlotSharingGroupBuilder#build}} located in 
> {{LocalInputPreferredSlotSharingStrategy}}.
> The original implementation is: 
> {code:java}
> for all SchedulingExecutionVertex in DefaultScheduler:
>   for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
> get the result partition's producer vertex and determine the 
> ExecutionSlotSharingGroup where the producer vertex locates is available for 
> current vertex{code}
> This procedure has O(N^2) complexity.
> Instead of iterating over the ExecutionSlotSharingGroups of producer vertices 
> for all consumed result partitions, we can maintain a set of all available 
> ExecutionSlotSharingGroups for the consumed result partitions. Once a group 
> is assigned, it will be removed from the available group set. This will 
> decrease the complexity from O(N^2) to O(N).
> The optimization of this procedure will speed up the initialization of 
> DefaultScheduler. It will accelerate the submission of a new job, especially 
> for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22773) Optimize the construction of pipelined regions

2021-08-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22773.
---
Resolution: Done

Done via
656901beb4617c356840e9ae677ad2c9e65fd8da
2cfbf35649acb78711790ff67f9043835907b8ac
4c7bc08807e03e025c761ac107d7625e689eb6a8
5d77c191cec26147941d1ea06804901bc4c344a1

> Optimize the construction of pipelined regions
> --
>
> Key: FLINK-22773
> URL: https://issues.apache.org/jira/browse/FLINK-22773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> During the initialization of DefaultExecutionTopology, pipelined regions will 
> be computed for scheduling. Currently the complexity of this procedure is 
> O(N^2):
>  
> {code:java}
> for all vertices in the topology:
>   for all consumed results of the vertex:
> if the consumed result is reconnectable:
>   merge the current region with its producer region
> {code}
> One possible solution is mentioned in FLINK-17330.
> If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
> initialization of SchedulerNG, and accelerate the submission of a new job, 
> especially for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-08-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Comment: was deleted

(was: This issue was labeled "stale-critical" 7 days ago and has not received 
any updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.
)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]

[jira] [Issue Comment Deleted] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-08-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22945:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.
)

> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/11/21, 3:01 AM:
---

I think I find the cause of the major regression.

*Cause*
The major regression happens because FLINK-23372 disables slot sharing of batch 
job tasks. And a default MiniCluster would just provide 1 task manager with 1 
slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the major 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the major regression was 
gone. And that's why we cannot reproduce the obvious regression by reverting 
FLINK-23372 on latest master.

This also explains that 
- why the obvious regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

Note that there still seems to be minor regression (~1%) after applying 
FLINK-23372. The possible reason is explained above in Stephan's 
[comment|https://issues.apache.org/jira/browse/FLINK-23593?focusedCommentId=17393287=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17393287].
 It's also acceptable in my opinion.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput sharing (right before 
FLINK-23372)|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput non-sharing (right after 
FLINK-23372)|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput sharing (right before 
FLINK-23372)|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput non-sharing (right after 
FLINK-23372)|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|
|latest sortedTwoInput sharing (reverting FLINK-23372) on latest 
master)|1938.716479|[#414|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/414/]|
|latest sortedTwoInput non-sharing on latest 
master|1926.685377|[#413|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/413/]|




was (Author: zhuzh):
I think I find the cause of the major regression.

*Cause*
The major regression happens because FLINK-23372 disables slot sharing of batch 
job tasks. And a default MiniCluster would just provide 1 task manager with 1 
slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the major 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the major regression was 
gone. And that's why we cannot reproduce the obvious regression by reverting 
FLINK-23372 on latest master.

This also explains that 
- why the obvious regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

Note that there still seems to be minor regression (~1%) after applying 
FLINK-23372. The possible reason is explained above in Stephan's 
[comment|https://issues.apache.org/jira/browse/FLINK-23593?focusedCommentId=17393287=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17393287].
 It's also acceptable in my opinion.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput sharing (right before 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/11/21, 3:01 AM:
---

I think I find the cause of the major regression.

*Cause*
The major regression happens because FLINK-23372 disables slot sharing of batch 
job tasks. And a default MiniCluster would just provide 1 task manager with 1 
slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the major 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the major regression was 
gone. And that's why we cannot reproduce the obvious regression by reverting 
FLINK-23372 on latest master.

This also explains that 
- why the obvious regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

Note that there still seems to be minor regression (~1%) after applying 
FLINK-23372. The possible reason is explained above in Stephan's 
[comment|https://issues.apache.org/jira/browse/FLINK-23593?focusedCommentId=17393287=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17393287].
 It's also acceptable in my opinion.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput sharing (right before 
FLINK-23372)|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput non-sharing (right after 
FLINK-23372)|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput sharing (right before 
FLINK-23372)|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput non-sharing (right after 
FLINK-23372)|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|
|latest sortedTwoInput sharing (reverting FLINK-23372) on latest 
master)|1938.716479|[#414|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/414/]|
|latest sortedTwoInput sharing on latest 
master|1926.685377|[#413|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/413/]|




was (Author: zhuzh):
I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/11/21, 2:53 AM:
---

I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|
|latest sortedTwoInput on latest 
master|1926.685377|[#413|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/413/]|
|latest sortedTwoInput reverting FLINK-23372 on latest 
master|1938.716479|[#414|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/414/]|




was (Author: zhuzh):
I think I find the cause of the major regression.

*Cause*
The major regression happens because FLINK-23372 disables slot sharing of batch 
job tasks. And a default MiniCluster would just provide 1 task manager with 1 
slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the major regression was 
gone. And that's why we cannot reproduce the obvious regression by reverting 
FLINK-23372 on latest master.

This also explains that 
- why the obvious regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this major regression is expected and acceptable.

Note that there still seems to be a minor regression(~1%) after FLINK-23372. 
The cause may be the increased overhead on slot allocation or memory 
initialization, as Stephan [commented 
above|https://issues.apache.org/jira/browse/FLINK-23593?focusedCommentId=17393287=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17393287].
 It is also acceptable in my opinion.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/11/21, 2:51 AM:
---

I think I find the cause of the major regression.

*Cause*
The major regression happens because FLINK-23372 disables slot sharing of batch 
job tasks. And a default MiniCluster would just provide 1 task manager with 1 
slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the major regression was 
gone. And that's why we cannot reproduce the obvious regression by reverting 
FLINK-23372 on latest master.

This also explains that 
- why the obvious regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this major regression is expected and acceptable.

Note that there still seems to be a minor regression(~1%) after FLINK-23372. 
The cause may be the increased overhead on slot allocation or memory 
initialization, as Stephan [commented 
above|https://issues.apache.org/jira/browse/FLINK-23593?focusedCommentId=17393287=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17393287].
 It is also acceptable in my opinion.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|
|latest sortedTwoInput on latest 
master|1926.685377|[#413|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/413/]|
|latest sortedTwoInput reverting FLINK-23372 on latest 
master|1938.716479|[#414|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/414/]|




was (Author: zhuzh):
I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/10/21, 12:09 PM:


I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. This increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|




was (Author: zhuzh):
I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. The increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|



> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/10/21, 12:08 PM:


I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. The increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|




was (Author: zhuzh):
I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. 
And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. The increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|



> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] 

[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396636#comment-17396636
 ] 

Zhu Zhu commented on FLINK-23593:
-

I think I find the cause of the regression.

*Cause*
The regression happens because FLINK-23372 disables slot sharing of batch job 
tasks. 
And a default MiniCluster would just provide 1 task manager with 1 slot.
This means that the two source tasks of {{sortedTwoInput}} were able to run 
simultaneously before FLINK-23372 and had to run sequentially after FLINK-23372 
was merged. The increased the total execution time and resulted in the 
regression.

Later on 07-20, an 
[improvement|https://github.com/apache/flink-benchmarks/commit/70d9b7b4927fc38ecf0950e55a47325b71e2dd63]
 was made on flink-benchmarks and changed the MiniCluster to be pre-launched 
with 1 task manager with 4 slots. This enabled the two source tasks of 
{{sortedTwoInput}} to run simultaneously again. And the regression was gone. 
And that's why we cannot reproduce the regression by reverting FLINK-23372 on 
latest master.

This also explains that 
- why the regression only happened to {{sortedTwoInput}} and 
{{sortedMultiInput}} and not to {{sortedOneInput}}. 
- why the performance increased on 07-20 and it also only happened to 
{{sortedTwoInput}} and {{sortedMultiInput}}

*Conclusion*
It is expected that more slots may be needed for a batch job to run tasks 
simultaneously. However, this does not mean more resources are needed because 
theoretically each slot can be smaller because it is no longer shared. 
Therefore, this regression is expected and acceptable.

*Attachment*
||Benchmark||Score||Link||
|07-15 sortedTwoInput before 
FLINK-23372|1904.626380|[#418|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/418/]|
|07-15 sortedTwoInput after 
FLINK-23372|1782.644331|[#419|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/419/]|
|07-20 sortedTwoInput before 
FLINK-23372|1964.448112|[#420|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/420/]|
|07-20 sortedTwoInput after 
FLINK-23372|1944.880662|[#421|http://codespeed.dak8s.net:8080/job/flink-benchmark-request/421/]|



> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395983#comment-17395983
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/9/21, 11:24 AM:
---

I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2#/?exe=1,3,5=sortedTwoInput=2=200=off=on=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. 
Maybe it is related to environment differences (e.g. HDD) but I have no idea 
yet.


was (Author: zhuzh):
I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2#/?exe=1,3,5=sortedTwoInput=2=200=off=on=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. Maybe it's due to HDD but I have 
no idea yet.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393950#comment-17393950
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/9/21, 11:22 AM:
---

>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

-Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.-
Ignore this line because it is wrong.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time and may make the regression on initialization time less obvious.


was (Author: zhuzh):
>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time and may make the regression on initialization time less obvious.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 

[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395983#comment-17395983
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/9/21, 11:20 AM:
---

I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2#/?exe=1,3,5=sortedTwoInput=2=200=off=on=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. Maybe it's due to HDD but I have 
no idea yet.


was (Author: zhuzh):
I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2#/?exe=1,3,5=sortedTwoInput=2=200=off=on=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. Maybe it's due to HDD but I have 
no idea yet.






[2]

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions 

[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395983#comment-17395983
 ] 

Zhu Zhu commented on FLINK-23593:
-

I tried the benchmarks locally before/after applying FLINK-23372 and did not 
see obvious regression.
Also tried benchmarks on commit f4afbf3e7de19ebcc5cb9324a22ba99fcd354dce(last 
good on 
[codespeed|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2#/?exe=1,3,5=sortedTwoInput=2=200=off=on=on]
 curve)  and eb8100f7afe1cd2b6fceb55b174de097db752fc7(first bad on the curve) 
but did not reproduce the regression either. Maybe it's due to HDD but I have 
no idea yet.






[2]

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time

2021-08-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-16069.
---
Resolution: Duplicate

> Creation of TaskDeploymentDescriptor can block main thread for long time
> 
>
> Key: FLINK-16069
> URL: https://issues.apache.org/jira/browse/FLINK-16069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: huweihua
>Priority: Major
> Attachments: FLINK-16069-POC-results, batch.png, streaming.png
>
>
> The deploy of tasks will take long time when we submit a high parallelism 
> job. And Execution#deploy run in mainThread, so it will block JobMaster 
> process other akka messages, such as Heartbeat. The creation of 
> TaskDeploymentDescriptor take most of time. We can put the creation in future.
> For example, A job [source(8000)->sink(8000)], the total 16000 tasks from 
> SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of 
> TaskManager timeout and job never success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time

2021-08-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395728#comment-17395728
 ] 

Zhu Zhu commented on FLINK-16069:
-

Thanks for making the improvements and sharing the results! [~Thesharing]
The attached graph shows the improvement on TaskDeploymentDescriptor creation.
And the table shows the E2E deployment improvement.

> Creation of TaskDeploymentDescriptor can block main thread for long time
> 
>
> Key: FLINK-16069
> URL: https://issues.apache.org/jira/browse/FLINK-16069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: huweihua
>Priority: Major
> Attachments: FLINK-16069-POC-results, batch.png, streaming.png
>
>
> The deploy of tasks will take long time when we submit a high parallelism 
> job. And Execution#deploy run in mainThread, so it will block JobMaster 
> process other akka messages, such as Heartbeat. The creation of 
> TaskDeploymentDescriptor take most of time. We can put the creation in future.
> For example, A job [source(8000)->sink(8000)], the total 16000 tasks from 
> SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of 
> TaskManager timeout and job never success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393950#comment-17393950
 ] 

Zhu Zhu edited comment on FLINK-23593 at 8/5/21, 12:19 PM:
---

>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time and may make the regression on initialization time less obvious.


was (Author: zhuzh):
>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time may make the regression on initialization time less obvious.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> 

[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393950#comment-17393950
 ] 

Zhu Zhu commented on FLINK-23593:
-

>> Could the larger difference between local benchmark vs. cloud be that the 
>> cloud is running with regular HDDs and we always spill to disk because 
>> SORT_SPILLING_THRESHOLD is set to 0?

Maybe yes. Because the record processing time can be shorter on SSD and the 
increased initialization time(described in *Trying to explain the Regression*) 
will be more obvious.

Another similar suspicion is that the flink-benchmark 
[patch|https://github.com/twalthr/flink-benchmarks/commit/dfe3cad86030b551daaa7c4a5951a6e4c06fc061]
 increased `RECORDS_PER_INVOCATION` from 1_500_000 to 3_000_000. This increased 
processing time may make the regression on initialization time less obvious.

> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23172) Links to Task Failure Recovery page on Configuration page are broken

2021-08-05 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Affects Version/s: 1.13.2

> Links to Task Failure Recovery page on Configuration page are broken
> 
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links to [Task Failure 
> Recovery|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/]
>  page inside [Fault 
> Tolerance|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance]
>  section and [Advanced Fault Tolerance 
> Options|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options]
>  section on the 
> [Configuration|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  page are broken.
> Let's take an example. In the description of {{restart-strategy}}, currently 
> the link of {{fixed-delay}} refers to 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy],
>  which doesn't exist and would head to 404 error. The correct link is 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy].
> The links are located in {{RestartStrategyOptions.java}} and 
> {{JobManagerOptions.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393836#comment-17393836
 ] 

Zhu Zhu commented on FLINK-23593:
-

Thanks for the updates! [~sewen]

I think your guess about *Trying to explain the Regression* may be right.
However, it looks a bit weird to me that, there is a ~10% regression of 
[sortedTwoInput|http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput], 
but Timo's benchmark shows that enabling slot sharing or not is just results in 
~2% performance differences. So I'm a bit concerned that there may be 
unexpected problems.

I will try these benchmarks locally to see if I can find anything.


> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22674:

Fix Version/s: 1.14.0

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23214:

Fix Version/s: 1.14.0

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23249.
---
Resolution: Done

Done via 0ee4038ef596b22630bc814f677fa489d3796241

> Introduce ShuffleMasterContext to ShuffleMaster
> ---
>
> Key: FLINK-23249
> URL: https://issues.apache.org/jira/browse/FLINK-23249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
> ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext 
> can act as a proxy of ShuffleMaster and other components of Flink like the 
> ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22675:

Fix Version/s: 1.14.0

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
> Fix For: 1.14.0
>
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23214.
---
Resolution: Done

Done via 81e1db3c439c1758dccd1a20f2f6b70120f48ef7

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22674.
---
Resolution: Done

Done via 6bc8399e7f1738ec22cb1082c096269b5106cee5

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22675:
---

Assignee: Yingjie Cao  (was: Zhu Zhu)

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22675.
---
Resolution: Done

Done via 80df36b51af791f67126e07e182015ea6ea73fd2

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22674:
---

Assignee: Yingjie Cao

> Provide JobID when apply shuffle resource by ShuffleMaster
> --
>
> Key: FLINK-22674
> URL: https://issues.apache.org/jira/browse/FLINK-22674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink 'pluggable shuffle service' framework, only 
> PartitionDescriptor and ProducerDescriptor are included as parameters in 
> ShuffleMaster#registerPartitionWithProducer.
> But when extending a remote shuffle service based on 'pluggable shuffle 
> service', JobID is also needed when apply shuffle resource from remote 
> cluster. It can be used as an identification to link shuffle resource with 
> the corresponding job:
>  # Remote shuffle cluster can isolate or do capacity control on shuffle 
> resource between jobs;
>  # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
> lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22675) Add lifecycle methods to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22675:
---

Assignee: Zhu Zhu

> Add lifecycle methods to ShuffleMaster
> --
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23214:
---

Assignee: Yingjie Cao

> Make ShuffleMaster a cluster level shared service
> -
>
> Key: FLINK-23214
> URL: https://issues.apache.org/jira/browse/FLINK-23214
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> This ticket tries to make ShuffleMaster a cluster level shared service which 
> makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23249:
---

Assignee: Yingjie Cao

> Introduce ShuffleMasterContext to ShuffleMaster
> ---
>
> Key: FLINK-23249
> URL: https://issues.apache.org/jira/browse/FLINK-23249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
> ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext 
> can act as a proxy of ShuffleMaster and other components of Flink like the 
> ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22910) Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-08-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22910:
---

Assignee: Yingjie Cao

> Refine ShuffleMaster lifecycle management for pluggable shuffle service 
> framework
> -
>
> Key: FLINK-22910
> URL: https://issues.apache.org/jira/browse/FLINK-22910
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent 
> with the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to 
> Implement some important capabilities for remote shuffle service. For 
> example, 1) release external resources when a job finished; 2) Stop or start 
> tracking some partitions depending on the status of the external service or 
> system.
> We drafted a document[1] which proposed some simple changes to solve these 
> issues. The document is still not wholly completed yet. We will start a 
> discussion once it is finished.
>  
> [1] 
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23590) StreamTaskTest#testProcessWithUnAvailableInput is flaky

2021-08-03 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17392737#comment-17392737
 ] 

Zhu Zhu commented on FLINK-23590:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21450=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7

> StreamTaskTest#testProcessWithUnAvailableInput is flaky
> ---
>
> Key: FLINK-23590
> URL: https://issues.apache.org/jira/browse/FLINK-23590
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0
>Reporter: David Morávek
>Assignee: Anton Kalashnikov
>Priority: Critical
> Fix For: 1.14.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21218=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb]
>  
> {code:java}
> java.lang.AssertionError: 
> Expected: a value equal to or greater than <22L>
>  but: <217391L> was less than <22L>   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:964)
>   at org.junit.Assert.assertThat(Assert.java:930)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTest.testProcessWithUnAvailableInput(StreamTaskTest.java:1561)
>   at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23172) Links to Task Failure Recovery page on Configuration page are broken

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23172.
---
Resolution: Fixed

Fixed via 
5183b2af9d467708725bd1454a671bc7689159a5
46bf6d68ee97684949ba3ad38dc18ff7c800092a

> Links to Task Failure Recovery page on Configuration page are broken
> 
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links to [Task Failure 
> Recovery|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/]
>  page inside [Fault 
> Tolerance|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance]
>  section and [Advanced Fault Tolerance 
> Options|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options]
>  section on the 
> [Configuration|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  page are broken.
> Let's take an example. In the description of {{restart-strategy}}, currently 
> the link of {{fixed-delay}} refers to 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy],
>  which doesn't exist and would head to 404 error. The correct link is 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy].
> The links are located in {{RestartStrategyOptions.java}} and 
> {{JobManagerOptions.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22767) Optimize the initialization of LocalInputPreferredSlotSharingStrategy

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22767:
---

Assignee: Zhilong Hong

> Optimize the initialization of LocalInputPreferredSlotSharingStrategy
> -
>
> Key: FLINK-22767
> URL: https://issues.apache.org/jira/browse/FLINK-22767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
>
> Based on the scheduler benchmark introduced in FLINK-21731, we find that 
> during the initialization of {{LocalInputPreferredSlotSharingStrategy}}, 
> there's a procedure that has O(N^2) complexity: 
> {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}
>  located in {{LocalInputPreferredSlotSharingStrategy}}.
> The original implementation is: 
> {code:java}
> for all SchedulingExecutionVertex in DefaultScheduler:
>   for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
> get the result partition's producer vertex and determine the 
> ExecutionSlotSharingGroup where the producer vertex locates is available for 
> current vertex{code}
> This procedure has O(N^2) complexity.
> It's obvious that the result partitions in the same ConsumedPartitionGroup 
> have the same producer vertex. So we can just iterate over the 
> ConsumedPartitionGroups instead of all the consumed partitions. This will 
> decrease the complexity from O(N^2) to O(N).
> The optimization of this procedure will speed up the initialization of 
> DefaultScheduler. It will accelerate the submission of a new job, especially 
> for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23599) Remove JobVertex#connectIdInput

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23599.
---
Resolution: Done

Done via ec9ff1ee5e33529260d6a3adfad4b0b34efde55e

> Remove JobVertex#connectIdInput
> ---
>
> Key: FLINK-23599
> URL: https://issues.apache.org/jira/browse/FLINK-23599
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> {{JobVertex#connectIdInput}} is not used in production anymore. It's only 
> used in the unit tests {{testAttachViaIds}} and 
> {{testCannotConnectMissingId}} located in 
> {{DefaultExecutionGraphConstructionTest}}. However, these two test cases are 
> designed to test this method. Therefore, this method and its test cases can 
> be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23599) Remove JobVertex#connectIdInput

2021-08-03 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23599:
---

Assignee: Zhilong Hong

> Remove JobVertex#connectIdInput
> ---
>
> Key: FLINK-23599
> URL: https://issues.apache.org/jira/browse/FLINK-23599
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> {{JobVertex#connectIdInput}} is not used in production anymore. It's only 
> used in the unit tests {{testAttachViaIds}} and 
> {{testCannotConnectMissingId}} located in 
> {{DefaultExecutionGraphConstructionTest}}. However, these two test cases are 
> designed to test this method. Therefore, this method and its test cases can 
> be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23593) Performance regression on 15.07.2021

2021-08-03 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17392091#comment-17392091
 ] 

Zhu Zhu commented on FLINK-23593:
-

I'd like to understand why the regression happens due to FLINK-23372 before 
deciding whether it is really a problem.
I do not know much about these two benchmarks. IIUC, the mentioned benchmarks 
are for batch jobs only.
After FLINK-23372, batch job upstream and downstream tasks no longer share 
slots. So maybe the reason is that upstream and downstream tasks are deployed 
to different task managers and cannot send data via local input channels?

[~twalthr] do you think this is the cause? or any other suspicions?


> Performance regression on 15.07.2021
> 
>
> Key: FLINK-23593
> URL: https://issues.apache.org/jira/browse/FLINK-23593
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Benchmarks
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23354) Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor

2021-07-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23354.
---
Resolution: Done

Done via 5c475d41fea3c81557e0d463bed1c94024dd0da5

> Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor
> --
>
> Key: FLINK-23354
> URL: https://issues.apache.org/jira/browse/FLINK-23354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 3 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005. 
> For more details about the part 2 please see FLINK-23218._
> Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
> JobManager. The caches are the local file stored on the TaskExecutor. The 
> blob cache will not be cleaned up until one hour after the related job is 
> finished. In FLINK-23218, we are going to distribute the cached 
> ShuffleDescriptors via blob. When large amount of failovers happen, there 
> will be a lot of cache stored on local disk. The blob cache will occupy large 
> amount of disk space. In extreme cases, the blob would blow up the disk space.
> So we need to add a limit size for the ShuffleDescriptors stored in 
> PermanentBlobCache on TaskExecutor, as described in the comments of 
> FLINK-23218. The main idea is to add a size limit and and delete the blobs in 
> LRU order if the size limit is exceeded. Before a blob item is cached, 
> TaskExecutor will firstly check the overall size of cache. If the overall 
> size exceeds the limit, the blob will be deleted in LRU order until the limit 
> is not exceeded anymore. For the blob cache that is deleted, if it is used 
> afterwards, it will be downloaded from the HA or the blob server again.
> The default value of the size limit for the ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor will be 100 MiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-07-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17389206#comment-17389206
 ] 

Zhu Zhu commented on FLINK-23172:
-

Thanks for reporting this problem! [~Thesharing]
I have assigned you this ticket.

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23172:
---

Assignee: Zhilong Hong

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22773) Optimize the construction of pipelined regions

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22773:
---

Assignee: Zhilong Hong

> Optimize the construction of pipelined regions
> --
>
> Key: FLINK-22773
> URL: https://issues.apache.org/jira/browse/FLINK-22773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> During the initialization of DefaultExecutionTopology, pipelined regions will 
> be computed for scheduling. Currently the complexity of this procedure is 
> O(N^2):
>  
> {code:java}
> for all vertices in the topology:
>   for all consumed results of the vertex:
> if the consumed result is reconnectable:
>   merge the current region with its producer region
> {code}
> One possible solution is mentioned in FLINK-17330.
> If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
> initialization of SchedulerNG, and accelerate the submission of a new job, 
> especially for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22773) Optimize the construction of pipelined regions

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22773:

Fix Version/s: 1.14.0

> Optimize the construction of pipelined regions
> --
>
> Key: FLINK-22773
> URL: https://issues.apache.org/jira/browse/FLINK-22773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> During the initialization of DefaultExecutionTopology, pipelined regions will 
> be computed for scheduling. Currently the complexity of this procedure is 
> O(N^2):
>  
> {code:java}
> for all vertices in the topology:
>   for all consumed results of the vertex:
> if the consumed result is reconnectable:
>   merge the current region with its producer region
> {code}
> One possible solution is mentioned in FLINK-17330.
> If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
> initialization of SchedulerNG, and accelerate the submission of a new job, 
> especially for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23354) Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor

2021-07-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23354:
---

Assignee: Zhilong Hong

> Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor
> --
>
> Key: FLINK-23354
> URL: https://issues.apache.org/jira/browse/FLINK-23354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 3 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005. 
> For more details about the part 2 please see FLINK-23218._
> Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
> JobManager. The caches are the local file stored on the TaskExecutor. The 
> blob cache will not be cleaned up until one hour after the related job is 
> finished. In FLINK-23218, we are going to distribute the cached 
> ShuffleDescriptors via blob. When large amount of failovers happen, there 
> will be a lot of cache stored on local disk. The blob cache will occupy large 
> amount of disk space. In extreme cases, the blob would blow up the disk space.
> So we need to add a limit size for the ShuffleDescriptors stored in 
> PermanentBlobCache on TaskExecutor, as described in the comments of 
> FLINK-23218. The main idea is to add a size limit and and delete the blobs in 
> LRU order if the size limit is exceeded. Before a blob item is cached, 
> TaskExecutor will firstly check the overall size of cache. If the overall 
> size exceeds the limit, the blob will be deleted in LRU order until the limit 
> is not exceeded anymore. For the blob cache that is deleted, if it is used 
> afterwards, it will be downloaded from the HA or the blob server again.
> The default value of the size limit for the ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor will be 100 MiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23402) Expose a consistent GlobalDataExchangeMode

2021-07-28 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1730#comment-1730
 ] 

Zhu Zhu commented on FLINK-23402:
-

+1 for option #2 to rename {{ShuffleMode}} as well as the corresponding config 
option. Because the config option way will always be required considering sql 
jobs(as a replacement of {{table.exec.shuffle-mode}}) and we cannot prevent 
users to set execution mode and shuffle mode with invalid combinations. 

Agreed that {{AUTOMATIC}} does not make too much sense then, and a proper 
default value of the config option would be good enough. So I think we can drop 
it and just keep {{ALL_EXCHANGES_PIPELINED}} and {{ALL_EXCHANGES_BLOCKING}} at 
the moment and use {{ALL_EXCHANGES_BLOCKING}} as the default value. 
Later after we have implemented the proposed global shuffle mode in 
FLINK-23470. We can add it with an explicit name 
({{INTRA_SLOT_EXCHANGES_PIPELINED}} maybe) to the accepted values of the config 
option and make it the default value.

> Expose a consistent GlobalDataExchangeMode
> --
>
> Key: FLINK-23402
> URL: https://issues.apache.org/jira/browse/FLINK-23402
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The Table API makes the {{GlobalDataExchangeMode}} configurable via 
> {{table.exec.shuffle-mode}}.
> In Table API batch mode the StreamGraph is configured with 
> {{ALL_EDGES_BLOCKING}} and in DataStream API batch mode 
> {{FORWARD_EDGES_PIPELINED}}.
> I would vote for unifying the exchange mode of both APIs so that complex SQL 
> pipelines behave identical in {{StreamTableEnvironment}} and 
> {{TableEnvironment}}. Also the feedback a got so far would make 
> {{ALL_EDGES_BLOCKING}} a safer option to run pipelines successfully with 
> limited resources.
> [~lzljs3620320]
> {quote}
> The previous history was like this:
> - The default value is pipeline, and we find that many times due to 
> insufficient resources, the deployment will hang. And the typical use of 
> batch jobs is small resources running large parallelisms, because in batch 
> jobs, the granularity of failover is related to the amount of data processed 
> by a single task. The smaller the amount of data, the faster the fault 
> tolerance. So most of the scenarios are run with small resources and large 
> parallelisms, little by little slowly running.
> - Later, we switched the default value to blocking. We found that the better 
> blocking shuffle implementation would not slow down the running speed much. 
> We tested tpc-ds and it took almost the same time.
> {quote}
> [~dwysakowicz]
> {quote}
> I don't see a problem with changing the default value for DataStream batch 
> mode if you think ALL_EDGES_BLOCKING is the better default option.
> {quote}
> In any case, we should make this configurable for DataStream API users and 
> make the specific Table API option obsolete.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23005) Cache the compressed serialized value of ShuffleDescriptors

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23005.
---
Resolution: Done

Done via
a3f72f20acd4df1dbdc61e145d1d932f61ca63f8
6812d18c358ce007a5cbcd685f32f59c70b03a49
b0d34f819804269b0d980b6747db765133849fe4

> Cache the compressed serialized value of ShuffleDescriptors
> ---
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
>  
> The optimization contains three parts:
> 1. Cache the compressed serialized value of ShuffleDescriptors
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> 2. Distribute the ShuffleDescriptors via blob server (see FLINK-23218)
> 3. Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor 
> (see FLINK-23354)
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our optimization. We choose the streaming job in the experiment because no 
> task will be running until all tasks are deployed. This avoids other 
> disturbing factors. The job contains two vertices: a source and a sink. They 
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of 
> the first task that transitions to _deploying_ and the timestamp of the last 
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23218:
---

Assignee: Zhilong Hong

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23218.
---
Resolution: Done

Done via ee7e9c3b87f6533d6f54361fddc71585d6b8ad61

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23479) IncrementalAggregateJsonPlanTest.testIncrementalAggregateWithSumCountDistinctAndRetraction fail

2021-07-26 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387460#comment-17387460
 ] 

Zhu Zhu commented on FLINK-23479:
-

another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20976=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4

> IncrementalAggregateJsonPlanTest.testIncrementalAggregateWithSumCountDistinctAndRetraction
>  fail
> ---
>
> Key: FLINK-23479
> URL: https://issues.apache.org/jira/browse/FLINK-23479
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Assignee: godfrey he
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker, test-stability
> Fix For: 1.14.0, 1.13.3
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20864=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=8904
> {code}
> Jul 23 04:21:56 [ERROR] 
> testIncrementalAggregateWithSumCountDistinctAndRetraction(org.apache.flink.table.planner.plan.nodes.exec.stream.IncrementalAggregateJsonPlanTest)
>   Time elapsed: 0.067 s  <<< FAILURE!
> Jul 23 04:21:56 org.junit.ComparisonFailure: 
> 

[jira] [Commented] (FLINK-23470) Use blocking shuffles but pipeline within a slot for batch mode

2021-07-23 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17386107#comment-17386107
 ] 

Zhu Zhu commented on FLINK-23470:
-

Sorry I did not see the discussion in FLINK-23402 until I noticed this ticket.
I agree that a "pipeline within a slot" mode is of good value and is easy for 
user understanding.
Actually {{FORWARD_EDGES_PIPELINED}} was designed for this purpose. It was 
working for table/sql jobs. However, it may not work for DataStream jobs 
because forward upstream and downstream tasks can be set to different slot 
sharing groups.

Therefore, to achieve goal, I think we can modify the handling of 
{{FORWARD_EDGES_PIPELINED}} in 
{{StreamingJobGraphGenerator#determineResultPartitionType()}} a bit to generate 
a PIPELINED edge only if the upstream and downstream tasks are in the same slot 
sharing group. And we may drop {{RESCALE_EDGES_PIPELINED}} and rename 
{{FORWARD_EDGES_PIPELINED}} to {{PIPELINED_WITHIN_SLOT}}/{{AUTOMATIC}}.

> Use blocking shuffles but pipeline within a slot for batch mode
> ---
>
> Key: FLINK-23470
> URL: https://issues.apache.org/jira/browse/FLINK-23470
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Timo Walther
>Priority: Major
>
> As discussed in FLINK-23402, we would like to introduce a good default 
> shuffle mode for batch runtime mode that is a trade-off between all pipelined 
> and all blocking shuffles.
> From the discussion in FLINK-23402:
> For the shuffle modes, I think those three settings are actually sufficient.:
> 1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
> batch recovery, no checkpoints, batch operators)
> 2. batch all, just in case you want to.
> 3. batch shuffles, pipeline within a slot. (DEFAULT)
> This should be the default, and it means we batch whenever a slot has a 
> dependency on another slot.
> A dependency between slots is:
> - any all-to-all connection (keyBy, broadcast, rebalance, random)
> - any pointwise connection (rescale)
> - any forward between different slot sharing groups
> Effectively only FORWARD connections within the same slot sharing group has 
> no dependency on another slot.
> That mode makes a lot of sense as the default, because it guarantees that we 
> can always run the program as long as we have at least one slot. No resource 
> starvation ever. But it retains pipelining where we don't chain operators due 
> to missing chaining logic (but we still slot-share them).
> Compared to this (3) mode, FORWARD_EDGES_PIPELINED and 
> POINTWISE_EDGES_PIPELINED are not well-defined.
> POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain 
> amount of resources, related to the rescale factor. Otherwise the job may 
> fail with resource starvation. Hard to understand and debug for users; not a 
> great option in my opinion.
> FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation 
> when the forward connection connects different slot sharing groups.
> That's why I would drop those (they make it confusing for users) not reuse 
> the GlobalDataExchangeMode, and rather introduce the option (3) above, which 
> mostly batches the exchanges, except when then they are guaranteed to be in 
> the same slot.
> As a side note: The difference between (3) and (2) should be already 
> relatively small in SQL jobs and become smaller over time, as more and more 
> can be chained together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-22 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385644#comment-17385644
 ] 

Zhu Zhu commented on FLINK-23218:
-

Thanks for confirming! [~trohrmann]
And thanks for the explanation for the transient blob option. I think you are 
right that we can try re-offload {{JobInformation}}, {{TaskInformation}} and 
{{ShuffleDescriptors}} before deploying a task. It may need some extra efforts 
though to track and de-duplicate blobs on BlobServer. So in the first step we 
will try introducing a {{read()}} API in {{PermanentBlobService}} which might 
be simpler.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385234#comment-17385234
 ] 

Zhu Zhu commented on FLINK-23218:
-

Just summarize the investigation and discussion, a common blob cache size limit 
may be hard and complicated to implement. And it can be hard for users to 
understand that there is a config to limit blob cache size but it only works 
for some of the permanent blobs.

Therefore, I'm thinking to just limit the size of {{ShuffleDescriptor}} blob 
cache and leave other exiting blobs not affected. In the first version we can 
hard code the size limit to be 100MB. It should be enough for 
{{ShuffleDescriptor}} blob cache, because a {{ShuffleDescriptor}} blob for a 
8000x8000 ALL-to-ALL {{JobEdge}} is just 200KB+ and normally there will not be 
too many such kinds of blobs. In this way, we do not need to expose a new 
config to users. And existing blob usages will not be affected. 

To achieve that, we can add a {{readAndTrack()}} method in 
{{PermanentBlobCache}} and only use it for {{ShuffleDescriptor}} blobs.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from 

[jira] [Comment Edited] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384876#comment-17384876
 ] 

Zhu Zhu edited comment on FLINK-23218 at 7/21/21, 1:02 PM:
---

I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the 
TTL.(e.g. One job have been running for days before a failover happens.)  So I 
think transient blobs cannot meet the requirement.


was (Author: zhuzh):
I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the TTL. So 
I think current transient blobs cannot meet the requirement.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384876#comment-17384876
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the TTL. So 
I think current transient blobs cannot meet the requirement.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384868#comment-17384868
 ] 

Zhu Zhu commented on FLINK-23218:
-

I think the PR mentioned above should be 
https://github.com/apache/flink/pull/16498.



> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our 

[jira] [Closed] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22677.
---
Resolution: Done

Done via
0d099b79fddc5e254884e44f2167c625744079a4
0b28fadccfb6b0d2a85592ced9e98b03a0c2d3bf

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22672) Some enhancements for pluggable shuffle service framework

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22672:
---

Assignee: Jin Xing

> Some enhancements for pluggable shuffle service framework
> -
>
> Key: FLINK-22672
> URL: https://issues.apache.org/jira/browse/FLINK-22672
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
> Fix For: 1.14.0
>
>
> "Pluggable shuffle service" in Flink provides an architecture which are 
> unified for both streaming and batch jobs, allowing user to customize the 
> process of data transfer between shuffle stages according to scenarios.
> There are already a number of implementations of "remote shuffle service" on 
> Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
> cluster and achieves benefits like :
>  # The lifecycle of computing resource can be decoupled with shuffle data, 
> once computing task is finished, idle computing nodes can be released with 
> its completed shuffle data accommodated on remote shuffle cluster.
>  # There is no need to reserve disk capacity for shuffle on computing nodes. 
> Remote shuffle cluster serves shuffling request with better scaling ability 
> and alleviates the local disk pressure on computing nodes when data skew.
> Based on "pluggable shuffle service", we build our own "remote shuffle 
> service" on Flink –- Lattice, which targets to provide functionalities and 
> improve performance for batch processing jobs. Basically it works as below:
>  # Lattice cluster works as an independent service for shuffling request;
>  # LatticeShuffleMaster extends ShuffleMaster, works inside JM and talks with 
> remote Lattice cluster for shuffle resource application and shuffle data 
> lifecycle management;
>  # LatticeShuffleEnvironment extends ShuffleEnvironment, works inside TM and 
> provides an environment for shuffling data from/to remote Lattice cluster;
> During the process of building Lattice we find some potential enhancements on 
> "pluggable shuffle service". I will enumerate and create some sub JIRAs under 
> this umbrella
>  
> [1] 
> [https://www.alibabacloud.com/blog/emr-remote-shuffle-service-a-powerful-elastic-tool-of-serverless-spark_597728]
> [2] [https://bestoreo.github.io/post/cosco/cosco/]
> [3] [https://github.com/uber/RemoteShuffleService]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22676.
---
Resolution: Done

Done via 62a342b647fc1eac7f87769be92fda798649d6d4

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Affects Version/s: (was: 1.4)
   1.14.0

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Fix Version/s: 1.14.0

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.4
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22676:
---

Assignee: Jin Xing

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-07-18 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:

Affects Version/s: 1.4

> The partition tracker should support remote shuffle properly
> 
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.4
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: This critical issue is unassigned and itself and all of its Sub-Tasks 
have not been updated for 7 days. So, it has been labeled "stale-critical". If 
this ticket is indeed critical, please either assign yourself or give an 
update. Afterwards, please remove the label. In 7 days the issue will be 
deprioritized.)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22017:

Comment: was deleted

(was: This issue was labeled "stale-critical" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.
)

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-07-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22017.
---
  Assignee: Zhilong Hong
Resolution: Fixed

Fixed via 
d2005268b1eeb0fe928b69c5e56ca54862fbf508
eb8100f7afe1cd2b6fceb55b174de097db752fc7

> Regions may never be scheduled when there are cross-region blocking edges
> -
>
> Key: FLINK-22017
> URL: https://issues.apache.org/jira/browse/FLINK-22017
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration.jpg
>
>
> For the topology with cross-region blocking edges, there are regions that may 
> never be scheduled. The case is illustrated in the figure below.
> !Illustration.jpg!
> Let's denote the vertices with layer_number. It's clear that the edge 
> connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no 
> blocking edges connected to other regions, it will be scheduled first. When 
> vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger 
> {{onExecutionStateChange}} for it.
> As expected, region 2 will be scheduled since all its consumed partitions are 
> consumable. But in fact region 2 won't be scheduled, because the result 
> partition of vertex2_2 is not tagged as consumable. Whether it is consumable 
> or not is determined by its IntermediateDataSet.
> However, an IntermediateDataSet is consumable if and only if all the 
> producers of its IntermediateResultPartitions are finished. This 
> IntermediateDataSet will never be consumable since vertex2_3 is not 
> scheduled. All in all, this forms a deadlock that a region will never be 
> scheduled because it's not scheduled.
> As a solution we should let BLOCKING result partitions be consumable 
> individually. Note that this will result in the scheduling to become 
> execution-vertex-wise instead of stage-wise, with a nice side effect towards 
> better resource utilization. The PipelinedRegionSchedulingStrategy can be 
> simplified along with change to get rid of the correlatedResultPartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377963#comment-17377963
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took another think and 10GB sounds good to me now. If we always want users to 
limit the size when configuration "blob.offload.minsize" for large scale jobs.
Adding a limit by default is always better than no limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377961#comment-17377961
 ] 

Zhu Zhu commented on FLINK-23218:
-

10GB looks a bit too large to limit the blob size by default.
If we want the limit to be very large by default, maybe we can make it 
unlimited by default? Users who is changing configuration 
"blob.offload.minsize" for large scale jobs should be aware of the residual 
issue and set the size limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377811#comment-17377811
 ] 

Zhu Zhu commented on FLINK-23218:
-

1. To not affect existing users, I prefer limit to not be too small otherwise 
the deployment performance may be affected and more loads download can result 
in heavier master/dfs loads. 1GB sound good to me. Note that for most users, 
jobs are low scale and the {{ShuffleDescriptors}} cache can be very small and 
will not be shipped via blobs. So that a large limit will not cause new issues 
(compared that currently there is no limitation). 
2. For low scale jobs, the {{ShuffleDescriptors}} cache will not be shipped via 
blobs, so residue problems will not be worse. Even for large scale jobs, IIRC, 
the compressed {{ShuffleDescriptors}} cache of a 8000x8000 shuffle is 200k+ 
bytes which still does not exceed the 1MB blob offloading threshold. Therefore 
I think we can document for configuration "blob.offload.minsize" to notify 
users to be aware of the residuals and blob size limit.




> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> 

[jira] [Closed] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-07-06 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15031.
---
Fix Version/s: (was: 1.12.0)
   1.14.0
   Resolution: Fixed

Done via 
2c52816540e0d5b7c1b3ccdfa35cadc2d026e25b
e32f0f82164512f632a533fad01ead6a12ac8152

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23262) FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on azure

2021-07-06 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375376#comment-17375376
 ] 

Zhu Zhu commented on FLINK-23262:
-

another instance:
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/19956/logs/104

> FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on azure
> ---
>
> Key: FLINK-23262
> URL: https://issues.apache.org/jira/browse/FLINK-23262
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Major
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19942=logs=219e462f-e75e-506c-3671-5017d866ccf6=4c5dc768-5c82-5ab0-660d-086cb90b76a0=5584
> {code}
> Jul 05 22:19:00 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 4.334 s <<< FAILURE! - in 
> org.apache.flink.test.streaming.api.FileReadingWatermarkITCase
> Jul 05 22:19:00 [ERROR] 
> testWatermarkEmissionWithChaining(org.apache.flink.test.streaming.api.FileReadingWatermarkITCase)
>   Time elapsed: 4.16 s  <<< FAILURE!
> Jul 05 22:19:00 java.lang.AssertionError: too few watermarks emitted: 4
> Jul 05 22:19:00   at org.junit.Assert.fail(Assert.java:89)
> Jul 05 22:19:00   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 05 22:19:00   at 
> org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.testWatermarkEmissionWithChaining(FileReadingWatermarkITCase.java:65)
> Jul 05 22:19:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 05 22:19:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 05 22:19:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 05 22:19:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 05 22:19:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 05 22:19:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 05 22:19:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 05 22:19:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jul 05 22:19:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jul 05 22:19:00   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> Jul 05 22:19:00   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375218#comment-17375218
 ] 

Zhu Zhu commented on FLINK-22677:
-

One thing need to mention is that I did not change {{AdaptiveScheduler}} to 
support async partition registration, because otherwise we will need to 
introduce async callback process to {{AdaptiveScheduler}}, which was 
intentionally avoided. Given that customized {{ShuffleMaster}} is mainly for 
batch jobs on external shuffle purpose and reactive mode is for streaming jobs 
only, I think for reactive mode we can keep the assumption that the 
{{ShuffleMaster}} will complete partition registration immediately. 
WDYT? [~trohrmann]

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375217#comment-17375217
 ] 

Zhu Zhu commented on FLINK-22677:
-

Problems below could happen if enabling partition registration is async:
1. task is deployed before its produced partitions are registered. this can 
result in a bad {{ResultPartitionDeploymentDescriptor}}
2. consumer task is deployed before its consumed partitions are registered, if 
the consumer and producer are in the same region
3. partitions can be leaked in {{JobMasterPartitionTracker}} if the producer 
task is failed/canceled before registration is done
4. partition can be leaked in the shuffle service if the producer task is 
failed/canceled before registration is done

To solve this problem, I'd propose to:
1. DefaultScheduler deploys a task only after all its partitions have completed 
registration. This solve problem #1 and #2. #2 can be solved because we now 
always deploy in topologically order, which means when a task is deployed, all 
its upstream tasks are deployed and their partitions will be registered already.
2. Check the producer state on partition registration completion. If the 
producer is not {{SCHEDULED}}, directly ask the {{ShuffleMaster}} to release 
the partition and do not track it

A PR is opened as proposed above.

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-22677:
---

Assignee: Zhu Zhu

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22677:

Affects Version/s: 1.14.0

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-07-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22677:

Fix Version/s: 1.14.0

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Jin Xing
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.14.0
>
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-30 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-22945.
---
Resolution: Fixed

Fixed via:
master: 5badc356abdcbb3d5cae1fe3f00f1ec18f414d98
1.13: 2d229fc6521b4fc924a4a66347d71b72a1455f77


> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Assignee: Gen Luo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
>  

[jira] [Updated] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Priority: Major  (was: Minor)

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-23172:

Issue Type: Bug  (was: Technical Debt)

> Links of restart strategy in configuration page is broken
> -
>
> Key: FLINK-23172
> URL: https://issues.apache.org/jira/browse/FLINK-23172
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Zhilong Hong
>Priority: Minor
> Fix For: 1.14.0
>
>
> The links in Fault Tolerance section of [the configuration 
> page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
>  is broken. Currently the link refers to 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
>  which doesn't exist and would head to 404 error. The correct link is 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23078) Scheduler Benchmarks not compiling

2021-06-29 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-23078.
---
Resolution: Fixed

Fixed via
flink:
439dbfa48122df164780f55da2cb05f64669a247
49fafcaa62cd34a468b6efc14bf49c53f860d7ce

flink-benchmarks:
e44f22bfa314c08b5a15cea932b84a848a6975ec
1331a9d73255b277d3c37bf9f222bfd0c968393b

> Scheduler Benchmarks not compiling
> --
>
> Key: FLINK-23078
> URL: https://issues.apache.org/jira/browse/FLINK-23078
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Assignee: Zhilong Hong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> {code:java}
> 07:46:50  [ERROR] 
> /home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/scheduler/benchmark/SchedulerBenchmarkBase.java:21:44:
>   error: cannot find symbol
> {code}
> CC [~chesnay] [~Thesharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15031) Automatically calculate required network memory for fine-grained jobs

2021-06-29 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371290#comment-17371290
 ] 

Zhu Zhu edited comment on FLINK-15031 at 6/29/21, 10:52 AM:


Discussed with [~trohrmann] offline. His concern was that the network 
configuration can be different in different TMs and this inconsistency can 
cause different announced and used network memory. To solve this problem, we 
think in the first version we should clearly state in documents that it 
requires JM/RM/TMs to have the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com]  [~karmagyz]  [~xintongsong]


was (Author: zhuzh):
Discussed with Till offline. His concern was that the network configuration can 
be different in different TMs and this inconsistency can cause different 
announced and used network memory. To solve this problem, we think in the first 
version we should clearly state in documents that it requires JM/RM/TMs to have 
the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com][~karmagyz][~xintongsong]

> Automatically calculate required network memory for fine-grained jobs
> -
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Jin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.




<    4   5   6   7   8   9   10   11   12   13   >