[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500379#comment-16500379 ] ASF GitHub Bot commented on FLINK-7866: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4949 > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500371#comment-16500371 ] ASF GitHub Bot commented on FLINK-7866: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4949 LTGM 👍 Will merge this. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498035#comment-16498035 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192407287 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map candidateMatchedResults = new HashMap<>(); + while (iterator.hasNext()) { IN candidate = iterator.next(); SlotContext slotContext = contextExtractor.apply(candidate); // this if checks if the candidate has is a local slot - if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) { + Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID()); + if (localWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // we can stop, because we found a match with best possible locality. - return resultProducer.apply(candidate, Locality.LOCAL); + // we found a match with locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0)); } else { // next candidate because this failed on the additional requirements. continue; } - } - - // this if checks if the candidate is at least host-local, if we did not find another host-local - // candidate before. - if (matchByHostName == null) { - if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) { + } else { + // this if checks if the candidate is host-local. + Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname()); + if (hostLocalWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // We remember the candidate, but still continue because there might still be a candidate - // that is local to the desired task manager. - matchByHostName = candidate; + // we found a match with host locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh)); } else {
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498034#comment-16498034 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192406775 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map candidateMatchedResults = new HashMap<>(); --- End diff -- Checked, just remember the currently highest ranked result is enough. 👍 > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498020#comment-16498020 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192402396 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); --- End diff -- 👍 learned! > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497943#comment-16497943 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192384844 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map candidateMatchedResults = new HashMap<>(); --- End diff -- @StefanRRichter thanks for your review, I'm a bit forgot the logical of this code now...I will take a look and think about your comments. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497923#comment-16497923 ] ASF GitHub Bot commented on FLINK-7866: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192378853 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map candidateMatchedResults = new HashMap<>(); --- End diff -- Do we even need to keep a map? Is it not enough to just remember the currently highest ranked result? > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497922#comment-16497922 ] ASF GitHub Bot commented on FLINK-7866: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192378473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); --- End diff -- We could use `preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum)`, and similar for `preferredFQHostNames`. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497910#comment-16497910 ] ASF GitHub Bot commented on FLINK-7866: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192376303 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map candidateMatchedResults = new HashMap<>(); + while (iterator.hasNext()) { IN candidate = iterator.next(); SlotContext slotContext = contextExtractor.apply(candidate); // this if checks if the candidate has is a local slot - if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) { + Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID()); + if (localWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // we can stop, because we found a match with best possible locality. - return resultProducer.apply(candidate, Locality.LOCAL); + // we found a match with locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0)); } else { // next candidate because this failed on the additional requirements. continue; } - } - - // this if checks if the candidate is at least host-local, if we did not find another host-local - // candidate before. - if (matchByHostName == null) { - if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) { + } else { + // this if checks if the candidate is host-local. + Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname()); + if (hostLocalWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // We remember the candidate, but still continue because there might still be a candidate - // that is local to the desired task manager. - matchByHostName = candidate; + // we found a match with host locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh)); } else {
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380666#comment-16380666 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/4949 @tillrohrmann I have updated the PR ... Could you please have a loot at this now or just wait until 1.5 release out? > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380288#comment-16380288 ] Sihua Zhou commented on FLINK-7866: --- [~till.rohrmann] Got it, since the branch release 1.5 has been created I think this feature can't get into 1.5 now cause it's not a bug but a improvement. I will update the PR after 1.5 is out and I will ping you at that time ;). > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380236#comment-16380236 ] Till Rohrmann commented on FLINK-7866: -- Hi [~sihuazhou], it still makes sense to add this feature. But you're right that we would have to rebase it onto the latest master. Then change would have to go to {{SlotProfile#LocalityAwareRequirementsToSlotMatcher}} now I think. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380199#comment-16380199 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/4949 @tillrohrmann Do you think this still an issue, I found the code is outdated so much, If you think this still should be address, I would like to update the PR, or I'd like to close this. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241691#comment-16241691 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/4949 Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure this is you wanted. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239972#comment-16239972 ] ASF GitHub Bot commented on FLINK-7866: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/4949 [FLINK-7866] [runtime] Weigh list of preferred locations for scheduling ## What is the purpose of the change This PR fixs [FLINK-7866](https://issues.apache.org/jira/browse/FLINK-7866). Currently, scheduler only use the list of preferred locations to decide where to schedule a task, this can be optimized by weigh the locations. That way, we would obtain better locality in some cases, moreover this PR also introduce `CandidateLocation` and `CandidateLocationEvaluator` to enable us to weigh location for `ExecutionVertex` by both state and input. A simple weigh example: - Preferred locations list: {{[location1, location2, location2]}} - Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} ## Brief change log - *Add CandidateLocation to represent a possible preferred location* - *Add CandidateLocationEvaluator to evaluate a candidate location, currently there are only INPUT_ONLY and STATE_ONLY evaluator, but this can easily be extended* - *add evaluation logic when allocate slot for `Execution`, it first gets a set of candidate locations, which are then measured by the evaluator, finally, return a location list that order by the weighted result desc* ## Verifying this change This change added tests and can be verified as follows: - add `testCandidateLocationEvaluateResult` test in `ExecutionTest` to make sure the evaluate logic. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink weigh_location Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4949 commit 4b867961d1c2061372145ab74f5b3e88b177b91a Author: summerleafs Date: 2017-11-06T05:43:25Z introduce CandidateLocation and CandidateLocationEvaluator for weigh the preferred locations. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237111#comment-16237111 ] Sihua Zhou commented on FLINK-7866: --- [~till.rohrmann] thanks and so happy for thinking of me when you plan to revision scheduler, I do have some thought about a properly scheduler, works incrementally is a nice requirement as you have explained. IMO, it also need the follow features. 1, Strong Extendibility. This means that scheduler should be easy to extends for other `schedule-factor` not only just for state & inputs, E.g: TM's status and cluster's status. 2, Consider the Runtime Information of the job. This means that when do scheduling we need to consider the previous runtime information of the execution, the runtime information should contains but only `task manager location`, `state size`, `input flow rate`, `thoughtput`, I think these will be helpful for scheduler. For example, imagine that vertex `A`,`B` both connect to vertex 'C' with `forward` and if A's `thoughtput` was `1M \ s` and B's `thoughput` was `100M\s`, than B's slot will be picked for 'C'. Currently, runtime information can only be filled when we do recover, is there a chance that the scheduler can Self-Regulating, dynamic change the schdule result. Ah, what I want to express is runtime information of execution is helpful. I am looking forward to your plan and interest in it :) > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235869#comment-16235869 ] Till Rohrmann commented on FLINK-7866: -- [~sihuazhou] after addressing this issue, it might make sense to create a dedicated Flink improvement proposal for further scheduling changes. I think Flink's scheduling algorithm will need some revision if we want to properly include state locality preferences in our scheduling decision. The first thing to do would be to collect all the requirements we have for a proper scheduling algorithm. Next thing would be to think about an algorithm which can calculate a good matching between the different requirements. Ideally this algorithm works incrementally such that we can interrupt it at any given point of time if it should take too long. What do you think? > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235403#comment-16235403 ] Till Rohrmann commented on FLINK-7866: -- Sure, go ahead. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235215#comment-16235215 ] Sihua Zhou commented on FLINK-7866: --- Hi [~till.rohrmann], i saw you have made a PR for [FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153], can I take this ticket now? > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212318#comment-16212318 ] Till Rohrmann commented on FLINK-7866: -- I haven't worked on this [~sihuazhou]. But I think this is issue is blocked by FLINK-7153. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210619#comment-16210619 ] ASF GitHub Bot commented on FLINK-7866: --- Github user summerleafs commented on the issue: https://github.com/apache/flink/pull/4369 duplicate with FLINK-7866. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210618#comment-16210618 ] Sihua Zhou commented on FLINK-7866: --- [~till.rohrmann] have you already work on this? If not, can i take this ticket? > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)