Repository: flink
Updated Branches:
  refs/heads/master d38154129 -> 5374ba97c


[FLINK-2225] [scheduler] Excludes static code paths from co-location constraint 
to avoid scheduling problems

This closes #843.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5374ba97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5374ba97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5374ba97

Branch: refs/heads/master
Commit: 5374ba97cd863af3062c9b6fa9f96c4b4ebe9a0a
Parents: d381541
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Jun 16 12:27:02 2015 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Jun 16 23:36:48 2015 +0200

----------------------------------------------------------------------
 .../optimizer/plantranslate/JobGraphGenerator.java  |  5 +++--
 .../org/apache/flink/runtime/instance/Instance.java |  5 +++++
 .../org/apache/flink/runtime/instance/Slot.java     |  4 ++--
 .../instance/SlotSharingGroupAssignment.java        |  7 +++++++
 .../runtime/jobmanager/scheduler/Scheduler.java     | 16 +++++++++-------
 5 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 109be20..281e425 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -527,8 +527,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                        
                        if (this.currentIteration != null) {
                                JobVertex head = 
this.iterations.get(this.currentIteration).getHeadTask();
-                               // the head may still be null if we descend 
into the static parts first
-                               if (head != null) {
+                               // Exclude static code paths from the 
co-location constraint, because otherwise
+                               // their execution determines the deployment 
slots of the co-location group
+                               if (node.isOnDynamicPath()) {
                                        
targetVertex.setStrictlyCoLocatedWith(head);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 7b48693..39caf08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -29,6 +29,8 @@ import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An instance represents a {@link 
org.apache.flink.runtime.taskmanager.TaskManager}
@@ -36,6 +38,8 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
  */
 public class Instance {
 
+       private final static Logger LOG = 
LoggerFactory.getLogger(Instance.class);
+
        /** The lock on which to synchronize allocations and failure state 
changes */
        private final Object instanceLock = new Object();
 
@@ -286,6 +290,7 @@ public class Instance {
                }
 
                if (slot.markReleased()) {
+                       LOG.debug("Return allocated slot {}.", slot);
                        synchronized (instanceLock) {
                                if (isDead) {
                                        return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 730e08a..341ef95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -244,11 +244,11 @@ public abstract class Slot {
 
        @Override
        public String toString() {
-               return hierarchy() + " - " + instance.getId() + " - " + 
getStateName(status);
+               return hierarchy() + " - " + instance + " - " + 
getStateName(status);
        }
 
        protected String hierarchy() {
-               return "(" + slotNumber + ")" + (getParent() != null ? 
getParent().hierarchy() : "");
+               return (getParent() != null ? getParent().hierarchy() : "") + 
"(" + slotNumber + ")";
        }
 
        private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 801e9ca..94249de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -83,6 +85,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  */
 public class SlotSharingGroupAssignment {
 
+       private final static Logger LOG = 
LoggerFactory.getLogger(SlotSharingGroupAssignment.class);
+
        /** The lock globally guards against concurrent modifications in the 
data structures */
        private final Object lock = new Object();
        
@@ -485,6 +489,7 @@ public class SlotSharingGroupAssignment {
 
                                // check whether the slot is already released
                                if (simpleSlot.markReleased()) {
+                                       LOG.debug("Release simple slot {}.", 
simpleSlot);
 
                                        AbstractID groupID = 
simpleSlot.getGroupID();
                                        SharedSlot parent = 
simpleSlot.getParent();
@@ -581,6 +586,8 @@ public class SlotSharingGroupAssignment {
                        // we remove ourselves from our parent slot
 
                        if (sharedSlot.markReleased()) {
+                               LOG.debug("Internally dispose empty shared slot 
{}.", sharedSlot);
+
                                int parentRemaining = 
parent.removeDisposedChildSlot(sharedSlot);
                                
                                if (parentRemaining > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 579a6b4..940082e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -209,7 +209,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                        
constraint.lockLocation();
                                                }
                                                
-                                               
updateLocalityCounters(slotFromGroup.getLocality(), vertex, 
slotFromGroup.getInstance());
+                                               
updateLocalityCounters(slotFromGroup, vertex);
                                                return slotFromGroup;
                                        }
                                        
@@ -279,7 +279,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                constraint.lockLocation();
                                        }
                                        
-                                       
updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance());
+                                       updateLocalityCounters(toUse, vertex);
                                }
                                catch (NoResourceAvailableException e) {
                                        throw e;
@@ -303,7 +303,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                
                                SimpleSlot slot = getFreeSlotForTask(vertex, 
preferredLocations, forceExternalLocation);
                                if (slot != null) {
-                                       
updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance());
+                                       updateLocalityCounters(slot, vertex);
                                        return slot;
                                }
                                else {
@@ -570,7 +570,9 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                }
        }
        
-       private void updateLocalityCounters(Locality locality, ExecutionVertex 
vertex, Instance location) {
+       private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex 
vertex) {
+               Locality locality = slot.getLocality();
+
                switch (locality) {
                case UNCONSTRAINED:
                        this.unconstrainedAssignments++;
@@ -588,13 +590,13 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                if (LOG.isDebugEnabled()) {
                        switch (locality) {
                                case UNCONSTRAINED:
-                                       LOG.debug("Unconstrained assignment: " 
+ vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+                                       LOG.debug("Unconstrained assignment: " 
+ vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                                        break;
                                case LOCAL:
-                                       LOG.debug("Local assignment: " + 
vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+                                       LOG.debug("Local assignment: " + 
vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                                        break;
                                case NON_LOCAL:
-                                       LOG.debug("Non-local assignment: " + 
vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+                                       LOG.debug("Non-local assignment: " + 
vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                                        break;
                        }
                }

Reply via email to