Repository: flink Updated Branches: refs/heads/master d38154129 -> 5374ba97c
[FLINK-2225] [scheduler] Excludes static code paths from co-location constraint to avoid scheduling problems This closes #843. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5374ba97 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5374ba97 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5374ba97 Branch: refs/heads/master Commit: 5374ba97cd863af3062c9b6fa9f96c4b4ebe9a0a Parents: d381541 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Jun 16 12:27:02 2015 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Jun 16 23:36:48 2015 +0200 ---------------------------------------------------------------------- .../optimizer/plantranslate/JobGraphGenerator.java | 5 +++-- .../org/apache/flink/runtime/instance/Instance.java | 5 +++++ .../org/apache/flink/runtime/instance/Slot.java | 4 ++-- .../instance/SlotSharingGroupAssignment.java | 7 +++++++ .../runtime/jobmanager/scheduler/Scheduler.java | 16 +++++++++------- 5 files changed, 26 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 109be20..281e425 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -527,8 +527,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> { if (this.currentIteration != null) { JobVertex head = this.iterations.get(this.currentIteration).getHeadTask(); - // the head may still be null if we descend into the static parts first - if (head != null) { + // Exclude static code paths from the co-location constraint, because otherwise + // their execution determines the deployment slots of the co-location group + if (node.isOnDynamicPath()) { targetVertex.setStrictlyCoLocatedWith(head); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 7b48693..39caf08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -29,6 +29,8 @@ import akka.actor.ActorRef; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager} @@ -36,6 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; */ public class Instance { + private final static Logger LOG = LoggerFactory.getLogger(Instance.class); + /** The lock on which to synchronize allocations and failure state changes */ private final Object instanceLock = new Object(); @@ -286,6 +290,7 @@ public class Instance { } if (slot.markReleased()) { + LOG.debug("Return allocated slot {}.", slot); synchronized (instanceLock) { if (isDead) { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 730e08a..341ef95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -244,11 +244,11 @@ public abstract class Slot { @Override public String toString() { - return hierarchy() + " - " + instance.getId() + " - " + getStateName(status); + return hierarchy() + " - " + instance + " - " + getStateName(status); } protected String hierarchy() { - return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : ""); + return (getParent() != null ? getParent().hierarchy() : "") + "(" + slotNumber + ")"; } private static String getStateName(int state) { http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index 801e9ca..94249de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.util.AbstractID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -83,6 +85,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; */ public class SlotSharingGroupAssignment { + private final static Logger LOG = LoggerFactory.getLogger(SlotSharingGroupAssignment.class); + /** The lock globally guards against concurrent modifications in the data structures */ private final Object lock = new Object(); @@ -485,6 +489,7 @@ public class SlotSharingGroupAssignment { // check whether the slot is already released if (simpleSlot.markReleased()) { + LOG.debug("Release simple slot {}.", simpleSlot); AbstractID groupID = simpleSlot.getGroupID(); SharedSlot parent = simpleSlot.getParent(); @@ -581,6 +586,8 @@ public class SlotSharingGroupAssignment { // we remove ourselves from our parent slot if (sharedSlot.markReleased()) { + LOG.debug("Internally dispose empty shared slot {}.", sharedSlot); + int parentRemaining = parent.removeDisposedChildSlot(sharedSlot); if (parentRemaining > 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 579a6b4..940082e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -209,7 +209,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { constraint.lockLocation(); } - updateLocalityCounters(slotFromGroup.getLocality(), vertex, slotFromGroup.getInstance()); + updateLocalityCounters(slotFromGroup, vertex); return slotFromGroup; } @@ -279,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { constraint.lockLocation(); } - updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance()); + updateLocalityCounters(toUse, vertex); } catch (NoResourceAvailableException e) { throw e; @@ -303,7 +303,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); if (slot != null) { - updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance()); + updateLocalityCounters(slot, vertex); return slot; } else { @@ -570,7 +570,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } } - private void updateLocalityCounters(Locality locality, ExecutionVertex vertex, Instance location) { + private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex vertex) { + Locality locality = slot.getLocality(); + switch (locality) { case UNCONSTRAINED: this.unconstrainedAssignments++; @@ -588,13 +590,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { if (LOG.isDebugEnabled()) { switch (locality) { case UNCONSTRAINED: - LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location); + LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); break; case LOCAL: - LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location); + LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); break; case NON_LOCAL: - LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location); + LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); break; } }