[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500379#comment-16500379
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4949


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500371#comment-16500371
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4949
  
LTGM 👍 Will merge this.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498035#comment-16498035
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192407287
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+   
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+   oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+   
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
 
Iterator iterator = candidates.iterator();
 
-   IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
 
+   final Map 
candidateMatchedResults = new HashMap<>();
+
while (iterator.hasNext()) {
 
IN candidate = iterator.next();
SlotContext slotContext = 
contextExtractor.apply(candidate);
 
// this if checks if the candidate has is a 
local slot
-   if 
(preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID()))
 {
+   Integer localWeigh = 
preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
+   if (localWeigh != null) {
if 
(additionalRequirementsFilter.test(candidate)) {
-   // we can stop, because we 
found a match with best possible locality.
-   return 
resultProducer.apply(candidate, Locality.LOCAL);
+   // we found a match with 
locality.
+   
candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 
0));
} else {
// next candidate because this 
failed on the additional requirements.
continue;
}
-   }
-
-   // this if checks if the candidate is at least 
host-local, if we did not find another host-local
-   // candidate before.
-   if (matchByHostName == null) {
-   if 
(preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname()))
 {
+   } else {
+   // this if checks if the candidate is 
host-local.
+   Integer hostLocalWeigh = 
preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
+   if (hostLocalWeigh != null) {
if 
(additionalRequirementsFilter.test(candidate)) {
-   // We remember the 
candidate, but still continue because there might still be a candidate
-   // that is local to the 
desired task manager.
-   matchByHostName = 
candidate;
+   // we found a match 
with host locality.
+   
candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, 
hostLocalWeigh));
} else {

[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498034#comment-16498034
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192406775
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+   
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+   oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+   
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
 
Iterator iterator = candidates.iterator();
 
-   IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
 
+   final Map 
candidateMatchedResults = new HashMap<>();
--- End diff --

Checked, just remember the currently highest ranked result is enough. 👍 


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498020#comment-16498020
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192402396
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
--- End diff --

👍 learned!


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497943#comment-16497943
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192384844
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+   
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+   oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+   
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
 
Iterator iterator = candidates.iterator();
 
-   IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
 
+   final Map 
candidateMatchedResults = new HashMap<>();
--- End diff --

@StefanRRichter thanks for your review, I'm a bit forgot the logical of 
this code now...I will take a look and think about your comments.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497923#comment-16497923
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192378853
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+   
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+   oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+   
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
 
Iterator iterator = candidates.iterator();
 
-   IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
 
+   final Map 
candidateMatchedResults = new HashMap<>();
--- End diff --

Do we even need to keep a map? Is it not enough to just remember the 
currently highest ranked result?


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497922#comment-16497922
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192378473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
--- End diff --

We could use 
`preferredResourceIDs.merge(locationPreference.getResourceID(), 1, 
Integer::sum)`, and similar for `preferredFQHostNames`.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497910#comment-16497910
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4949#discussion_r192376303
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
@@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
-   HashSet preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+   Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
 
for (TaskManagerLocation locationPreference : 
locationPreferences) {
-   
preferredResourceIDs.add(locationPreference.getResourceID());
-   
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+   Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+   
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+   oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+   
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
 
Iterator iterator = candidates.iterator();
 
-   IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
 
+   final Map 
candidateMatchedResults = new HashMap<>();
+
while (iterator.hasNext()) {
 
IN candidate = iterator.next();
SlotContext slotContext = 
contextExtractor.apply(candidate);
 
// this if checks if the candidate has is a 
local slot
-   if 
(preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID()))
 {
+   Integer localWeigh = 
preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
+   if (localWeigh != null) {
if 
(additionalRequirementsFilter.test(candidate)) {
-   // we can stop, because we 
found a match with best possible locality.
-   return 
resultProducer.apply(candidate, Locality.LOCAL);
+   // we found a match with 
locality.
+   
candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 
0));
} else {
// next candidate because this 
failed on the additional requirements.
continue;
}
-   }
-
-   // this if checks if the candidate is at least 
host-local, if we did not find another host-local
-   // candidate before.
-   if (matchByHostName == null) {
-   if 
(preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname()))
 {
+   } else {
+   // this if checks if the candidate is 
host-local.
+   Integer hostLocalWeigh = 
preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
+   if (hostLocalWeigh != null) {
if 
(additionalRequirementsFilter.test(candidate)) {
-   // We remember the 
candidate, but still continue because there might still be a candidate
-   // that is local to the 
desired task manager.
-   matchByHostName = 
candidate;
+   // we found a match 
with host locality.
+   
candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, 
hostLocalWeigh));
} else {
   

[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380666#comment-16380666
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/4949
  
@tillrohrmann I have updated the PR ... Could you please have a loot at 
this now or  just wait until 1.5 release out?


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-02-28 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380288#comment-16380288
 ] 

Sihua Zhou commented on FLINK-7866:
---

[~till.rohrmann] Got it, since the branch release 1.5 has been created I think 
this feature can't get into 1.5 now cause it's not a bug but a improvement. I 
will update the PR after 1.5 is out and I will ping you at that time ;).

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-02-28 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380236#comment-16380236
 ] 

Till Rohrmann commented on FLINK-7866:
--

Hi [~sihuazhou], it still makes sense to add this feature. But you're right 
that we would have to rebase it onto the latest master. Then change would have 
to go to {{SlotProfile#LocalityAwareRequirementsToSlotMatcher}}  now I think. 

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380199#comment-16380199
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/4949
  
@tillrohrmann Do you think this still an issue, I found the code is 
outdated so much, If you think this still should be address, I would like to 
update the PR, or I'd like to close this.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241691#comment-16241691
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/4949
  
Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure 
this is you wanted.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239972#comment-16239972
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/4949

[FLINK-7866] [runtime] Weigh list of preferred locations for scheduling

## What is the purpose of the change

This PR fixs 
[FLINK-7866](https://issues.apache.org/jira/browse/FLINK-7866). Currently, 
scheduler only use the list of preferred locations to decide where to schedule 
a task, this can be optimized by weigh the locations. That way, we would obtain 
better locality in some cases, moreover this PR also introduce 
`CandidateLocation` and `CandidateLocationEvaluator` to enable us to weigh 
location for `ExecutionVertex` by both state and input.

A simple weigh example:
- Preferred locations list: {{[location1, location2, location2]}}
- Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}


## Brief change log

  - *Add CandidateLocation to represent a possible preferred location*
  - *Add CandidateLocationEvaluator to evaluate a candidate location, 
currently there are only INPUT_ONLY and STATE_ONLY evaluator, but this can 
easily be extended*
  - *add evaluation logic when allocate slot for `Execution`, it first gets 
a set of candidate locations, which are then measured by the evaluator, 
finally, return a location list that order by the weighted result desc*

## Verifying this change

This change added tests and can be verified as follows:
- add `testCandidateLocationEvaluateResult` test in `ExecutionTest` to make 
sure the evaluate logic.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink weigh_location

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4949


commit 4b867961d1c2061372145ab74f5b3e88b177b91a
Author: summerleafs 
Date:   2017-11-06T05:43:25Z

introduce CandidateLocation and CandidateLocationEvaluator for weigh the 
preferred locations.




> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237111#comment-16237111
 ] 

Sihua Zhou commented on FLINK-7866:
---

[~till.rohrmann] thanks and so happy for thinking of me when you plan to 
revision scheduler, I do have some thought about a properly scheduler, works 
incrementally is a nice requirement as you have explained. IMO, it also need 
the follow features.

1, Strong Extendibility.
This means that scheduler should be easy to extends for other 
`schedule-factor` not only just for state & inputs, E.g: TM's status and 
cluster's status.

2, Consider the Runtime Information of the job.
This means that when do scheduling we need to consider the previous 
runtime information of the execution, the runtime information should contains 
but only `task manager location`, `state size`, `input flow rate`, 
`thoughtput`, I think these will be helpful for scheduler. For example, imagine 
that vertex `A`,`B` both connect to vertex 'C' with `forward` and if A's 
`thoughtput` was `1M \ s` and B's `thoughput` was `100M\s`, than B's slot will 
be picked for 'C'. Currently, runtime information can only be filled when we do 
recover, is there a chance that the scheduler can Self-Regulating, dynamic 
change the schdule result. Ah, what I want to express is runtime information of 
execution is helpful.

I am looking forward to your plan and interest in it :)

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235869#comment-16235869
 ] 

Till Rohrmann commented on FLINK-7866:
--

[~sihuazhou] after addressing this issue, it might make sense to create a 
dedicated Flink improvement proposal for further scheduling changes. I think 
Flink's scheduling algorithm will need some revision if we want to properly 
include state locality preferences in our scheduling decision.

The first thing to do would be to collect all the requirements we have for a 
proper scheduling algorithm. Next thing would be to think about an algorithm 
which can calculate a good matching between the different requirements. Ideally 
this algorithm works incrementally such that we can interrupt it at any given 
point of time if it should take too long.

What do you think?

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235403#comment-16235403
 ] 

Till Rohrmann commented on FLINK-7866:
--

Sure, go ahead.

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-01 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235215#comment-16235215
 ] 

Sihua Zhou commented on FLINK-7866:
---

Hi [~till.rohrmann], i saw you have made a PR for 
[FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153], can I take this 
ticket now?

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-10-20 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212318#comment-16212318
 ] 

Till Rohrmann commented on FLINK-7866:
--

I haven't worked on this [~sihuazhou]. But I think this is issue is blocked by 
FLINK-7153.

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210619#comment-16210619
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user summerleafs commented on the issue:

https://github.com/apache/flink/pull/4369
  
duplicate with FLINK-7866.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-10-18 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210618#comment-16210618
 ] 

Sihua Zhou commented on FLINK-7866:
---

[~till.rohrmann] have you already work on this? If not, can i take this ticket?

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)