http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
deleted file mode 100644
index 0b19a5f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-
-public class SubSlot extends AllocatedSlot {
-
-       private final SharedSlot sharedSlot;
-       
-       private final AbstractID groupId;
-       
-       private final int subSlotNumber;
-       
-       
-       public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID 
groupId) {
-               super(sharedSlot.getAllocatedSlot().getJobID(),
-                               sharedSlot.getAllocatedSlot().getInstance(),
-                               sharedSlot.getAllocatedSlot().getSlotNumber());
-               
-               this.sharedSlot = sharedSlot;
-               this.groupId = groupId;
-               this.subSlotNumber = subSlotNumber;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void releaseSlot() {
-               // cancel everything, if there is something. since this is 
atomically status based,
-               // it will not happen twice if another attempt happened before 
or concurrently
-               try {
-                       cancel();
-               }
-               finally {
-                       if (markReleased()) {
-                               this.sharedSlot.returnAllocatedSlot(this);
-                       }
-               }
-       }
-       
-       public SharedSlot getSharedSlot() {
-               return this.sharedSlot;
-       }
-       
-       public AbstractID getGroupId() {
-               return groupId;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "SubSlot " + subSlotNumber + " (" + super.toString() + 
')';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
new file mode 100644
index 0000000..10796b6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.event.job.AbstractEvent;
+import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
+import org.apache.flink.runtime.event.job.JobEvent;
+import org.apache.flink.runtime.event.job.RecentJobEvent;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.StringUtils;
+import org.eclipse.jetty.io.EofException;
+
+public class JobManagerInfoServlet extends HttpServlet {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerInfoServlet.class);
+       
+       /** Underlying JobManager */
+       private final JobManager jobmanager;
+       
+       
+       public JobManagerInfoServlet(JobManager jobmanager) {
+               this.jobmanager = jobmanager;
+       }
+       
+       
+       @Override
+       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+                       
+               resp.setStatus(HttpServletResponse.SC_OK);
+               resp.setContentType("application/json");
+               
+               try {
+                       if("archive".equals(req.getParameter("get"))) {
+                               writeJsonForArchive(resp.getWriter(), 
jobmanager.getOldJobs());
+                       }
+                       else if("job".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               writeJsonForArchivedJob(resp.getWriter(), 
jobmanager.getArchive().getJob(JobID.fromHexString(jobId)));
+                       }
+                       else if("groupvertex".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               String groupvertexId = 
req.getParameter("groupvertex");
+                               
writeJsonForArchivedJobGroupvertex(resp.getWriter(), 
jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), 
JobVertexID.fromHexString(groupvertexId));
+                       }
+                       else if("taskmanagers".equals(req.getParameter("get"))) 
{
+                               resp.getWriter().write("{\"taskmanagers\": " + 
jobmanager.getNumberOfTaskManagers() +", \"slots\": 
"+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
+                       }
+                       else if("cancel".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               
jobmanager.cancelJob(JobID.fromHexString(jobId));
+                       }
+                       else if("updates".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               writeJsonUpdatesForJob(resp.getWriter(), 
JobID.fromHexString(jobId));
+                       } else if ("version".equals(req.getParameter("get"))) {
+                               writeJsonForVersion(resp.getWriter());
+                       }
+                       else{
+                               writeJsonForJobs(resp.getWriter(), 
jobmanager.getRecentJobs());
+                       }
+                       
+               } catch (Exception e) {
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       resp.getWriter().print(e.getMessage());
+                       if (LOG.isWarnEnabled()) {
+                               LOG.warn(StringUtils.stringifyException(e));
+                       }
+               }
+       }
+       
+       /**
+        * Writes ManagementGraph as Json for all recent jobs
+        * 
+        * @param wrt
+        * @param jobs
+        */
+       private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> 
jobs) {
+               
+               try {
+               
+                       wrt.write("[");
+                       
+                       // Loop Jobs
+                       for (int i = 0; i < jobs.size(); i++) {
+                               RecentJobEvent jobEvent = jobs.get(i);
+       
+                               writeJsonForJob(wrt, jobEvent);
+       
+                               //Write seperator between json objects
+                               if(i != jobs.size() - 1) {
+                                       wrt.write(",");
+                               }
+                       }
+                       wrt.write("]");
+               
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+               
+       }
+       
+       private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) 
throws IOException {
+               
+               ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+               
+               //Serialize job to json
+               wrt.write("{");
+               wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
+               wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
+               wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
+               wrt.write("\"time\": " + jobEvent.getTimestamp()+",");
+               
+               // Serialize ManagementGraph to json
+               wrt.write("\"groupvertices\": [");
+               boolean first = true;
+               
+               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+                       //Write seperator between json objects
+                       if(first) {
+                               first = false;
+                       } else {
+                               wrt.write(","); }
+                       
+                       wrt.write(JsonFactory.toJson(groupVertex));
+               }
+               wrt.write("]");
+               wrt.write("}");
+                       
+       }
+       
+       /**
+        * Writes Json with a list of currently archived jobs, sorted by time
+        * 
+        * @param wrt
+        * @param jobs
+        */
+       private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> 
jobs) {
+               
+               wrt.write("[");
+               
+               // sort jobs by time
+               Collections.sort(jobs,  new Comparator<RecentJobEvent>() {
+                       @Override
+                       public int compare(RecentJobEvent o1, RecentJobEvent 
o2) {
+                               if(o1.getTimestamp() < o2.getTimestamp()) {
+                                       return 1;
+                               } else {
+                                       return -1;
+                               }
+                       }
+                       
+               });
+               
+               // Loop Jobs
+               for (int i = 0; i < jobs.size(); i++) {
+                       RecentJobEvent jobEvent = jobs.get(i);
+                       
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + jobEvent.getJobID() + 
"\",");
+                       wrt.write("\"jobname\": \"" + 
jobEvent.getJobName()+"\",");
+                       wrt.write("\"status\": \""+ jobEvent.getJobStatus() + 
"\",");
+                       wrt.write("\"time\": " + jobEvent.getTimestamp());
+                       
+                       wrt.write("}");
+                       
+                       //Write seperator between json objects
+                       if(i != jobs.size() - 1) {
+                               wrt.write(",");
+                       }
+               }
+               wrt.write("]");
+               
+       }
+       
+       /**
+        * Writes infos about archived job in Json format, including 
groupvertices and groupverticetimes
+        * 
+        * @param wrt
+        * @param jobEvent
+        */
+       private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent 
jobEvent) {
+               
+               try {
+               
+                       wrt.write("[");
+               
+                       ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+                       
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + jobEvent.getJobID() + 
"\",");
+                       wrt.write("\"jobname\": \"" + 
jobEvent.getJobName()+"\",");
+                       wrt.write("\"status\": \""+ jobEvent.getJobStatus() + 
"\",");
+                       wrt.write("\"SCHEDULED\": "+ 
graph.getStatusTimestamp(JobStatus.CREATED) + ",");
+                       wrt.write("\"RUNNING\": "+ 
graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
+                       wrt.write("\"FINISHED\": "+ 
graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
+                       wrt.write("\"FAILED\": "+ 
graph.getStatusTimestamp(JobStatus.FAILED) + ",");
+                       wrt.write("\"CANCELED\": "+ 
graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
+
+                       if (jobEvent.getJobStatus() == JobStatus.FAILED) {
+                               wrt.write("\"failednodes\": [");
+                               boolean first = true;
+                               for (ExecutionVertex vertex : 
graph.getAllExecutionVertices()) {
+                                       if (vertex.getExecutionState() == 
ExecutionState.FAILED) {
+                                               SimpleSlot slot = 
vertex.getCurrentAssignedResource();
+                                               Throwable failureCause = 
vertex.getFailureCause();
+                                               if (slot != null || 
failureCause != null) {
+                                                       if (first) {
+                                                               first = false;
+                                                       } else {
+                                                               wrt.write(",");
+                                                       }
+                                                       wrt.write("{");
+                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : 
slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+                                                       wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
+                                                       wrt.write("}");
+                                               }
+                                       }
+                               }
+                               wrt.write("],");
+                       }
+
+                       // Serialize ManagementGraph to json
+                       wrt.write("\"groupvertices\": [");
+                       boolean first = true;
+                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+                               //Write seperator between json objects
+                               if(first) {
+                                       first = false;
+                               } else {
+                                       wrt.write(","); }
+                               
+                               wrt.write(JsonFactory.toJson(groupVertex));
+                               
+                       }
+                       wrt.write("],");
+                       
+                       // write accumulators
+                       Map<String, Object> accMap = 
AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID()));
+                       
+                       wrt.write("\n\"accumulators\": [");
+                       int i = 0;
+                       for( Entry<String, Object> accumulator : 
accMap.entrySet()) {
+                               wrt.write("{ \"name\": 
\""+accumulator.getKey()+" (" + 
accumulator.getValue().getClass().getName()+")\","
+                                               + " \"value\": 
\""+accumulator.getValue().toString()+"\"}\n");
+                               if(++i < accMap.size()) {
+                                       wrt.write(",");
+                               }
+                       }
+                       wrt.write("],\n");
+                       
+                       wrt.write("\"groupverticetimes\": {");
+                       first = true;
+                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+                               
+                               if(first) {
+                                       first = false;
+                               } else {
+                                       wrt.write(","); }
+                               
+                               // Calculate start and end time for groupvertex
+                               long started = Long.MAX_VALUE;
+                               long ended = 0;
+                               
+                               // Take earliest running state and latest 
endstate of groupmembers
+                               for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
+                                       
+                                       long running = 
vertex.getStateTimestamp(ExecutionState.RUNNING);
+                                       if (running != 0 && running < started) {
+                                               started = running;
+                                       }
+                                       
+                                       long finished = 
vertex.getStateTimestamp(ExecutionState.FINISHED);
+                                       long canceled = 
vertex.getStateTimestamp(ExecutionState.CANCELED);
+                                       long failed = 
vertex.getStateTimestamp(ExecutionState.FAILED);
+                                       
+                                       if(finished != 0 && finished > ended) {
+                                               ended = finished;
+                                       }
+                                       
+                                       if(canceled != 0 && canceled > ended) {
+                                               ended = canceled;
+                                       }
+                                       
+                                       if(failed != 0 && failed > ended) {
+                                               ended = failed;
+                                       }
+
+                               }
+                               
+                               
wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
+                               wrt.write("\"groupvertexid\": \"" + 
groupVertex.getJobVertexId() + "\",");
+                               wrt.write("\"groupvertexname\": \"" + 
groupVertex + "\",");
+                               wrt.write("\"STARTED\": "+ started + ",");
+                               wrt.write("\"ENDED\": "+ ended);
+                               wrt.write("}");
+                               
+                       }
+
+                       wrt.write("}");
+                       
+                       wrt.write("}");
+                       
+                       
+               wrt.write("]");
+               
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+               
+       }
+       
+       
+       /**
+        * Writes all updates (events) for a given job since a given time
+        * 
+        * @param wrt
+        * @param jobId
+        */
+       private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
+               
+               try {
+                       
+                       List<AbstractEvent> events = 
jobmanager.getEvents(jobId);
+                       
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + jobId + "\",");
+                       wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\",");
+                       wrt.write("\"recentjobs\": [");
+                               
+                       boolean first = true;
+                       for(RecentJobEvent rje: jobmanager.getRecentJobs()) {
+                               if(first) {
+                                       first = false;
+                               } else {
+                                       wrt.write(","); }
+                               
+                               wrt.write("\""+rje.getJobID().toString()+"\""); 
+                       }
+                                       
+                       wrt.write("],");
+                       
+                       wrt.write("\"vertexevents\": [");
+               
+                       first = true;
+                       for (AbstractEvent event: events) {
+                               
+                               if (event instanceof ExecutionStateChangeEvent) 
{
+                                       
+                                       if(first) {
+                                               first = false;
+                                       } else {
+                                               wrt.write(","); }
+                                       
+                                       ExecutionStateChangeEvent vertexevent = 
(ExecutionStateChangeEvent) event;
+                                       wrt.write("{");
+                                       wrt.write("\"vertexid\": \"" + 
vertexevent.getExecutionAttemptID() + "\",");
+                                       wrt.write("\"newstate\": \"" + 
vertexevent.getNewExecutionState() + "\",");
+                                       wrt.write("\"timestamp\": \"" + 
vertexevent.getTimestamp() + "\"");
+                                       wrt.write("}");
+                               }
+                       }
+                       
+                       wrt.write("],");
+                       
+                       wrt.write("\"jobevents\": [");
+                       
+                       first = true;
+                       for(AbstractEvent event: events) {
+                               
+                               if( event instanceof JobEvent) {
+                                       
+                                       if(first) {
+                                               first = false;
+                                       } else {
+                                               wrt.write(","); }
+                                       
+                                       JobEvent jobevent = (JobEvent) event;
+                                       wrt.write("{");
+                                       wrt.write("\"newstate\": \"" + 
jobevent.getCurrentJobStatus() + "\",");
+                                       wrt.write("\"timestamp\": \"" + 
jobevent.getTimestamp() + "\"");
+                                       wrt.write("}");
+                               }
+                       }
+                       
+                       wrt.write("]");
+                       
+                       wrt.write("}");
+                       
+               
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+               
+       }
+       
+       /**
+        * Writes info about one particular archived JobVertex in a job, 
including all member execution vertices, their times and statuses.
+        */
+       private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, 
RecentJobEvent jobEvent, JobVertexID vertexId) {
+               try {
+                       ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+                       
+                       ExecutionJobVertex jobVertex = 
graph.getJobVertex(vertexId);
+                       
+                       // Serialize ManagementGraph to json
+                       wrt.write("{\"groupvertex\": " + 
JsonFactory.toJson(jobVertex) + ",");
+                       
+                       wrt.write("\"verticetimes\": {");
+                       boolean first = true;
+                       for (ExecutionJobVertex groupVertex : 
graph.getAllVertices().values()) {
+                               
+                               for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
+                                       
+                                       Execution exec = 
vertex.getCurrentExecutionAttempt();
+                                       
+                                       if(first) {
+                                               first = false;
+                                       } else {
+                                               wrt.write(","); }
+                                       
+                                       wrt.write("\""+exec.getAttemptId() 
+"\": {");
+                                       wrt.write("\"vertexid\": \"" + 
exec.getAttemptId() + "\",");
+                                       wrt.write("\"vertexname\": \"" + vertex 
+ "\",");
+                                       wrt.write("\"CREATED\": "+ 
vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
+                                       wrt.write("\"SCHEDULED\": "+ 
vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
+                                       wrt.write("\"DEPLOYING\": "+ 
vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+                                       wrt.write("\"RUNNING\": "+ 
vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
+                                       wrt.write("\"FINISHED\": "+ 
vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
+                                       wrt.write("\"CANCELING\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
+                                       wrt.write("\"CANCELED\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
+                                       wrt.write("\"FAILED\": "+ 
vertex.getStateTimestamp(ExecutionState.FAILED) + "");
+                                       wrt.write("}");
+                               }
+                               
+                       }
+                       wrt.write("}}");
+                       
+               }
+               catch (IOException ioe) { // Connection closed by client
+                       String message = "Info server for jobmanager: 
Connection closed by client - " + ioe.getClass().getSimpleName();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(message, ioe);
+                       }
+                       else if (LOG.isInfoEnabled()) {
+                               LOG.info(message);
+                       }
+               } 
+       }
+       
+       /**
+        * Writes the version and the revision of Flink.
+        * 
+        * @param wrt
+        */
+       private void writeJsonForVersion(PrintWriter wrt) {
+               wrt.write("{");
+               wrt.write("\"version\": \"" + 
EnvironmentInformation.getVersion() + "\",");
+               wrt.write("\"revision\": \"" + 
EnvironmentInformation.getRevisionInformation().commitId + "\"");
+               wrt.write("}");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
deleted file mode 100644
index ef5a246..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.event.job.AbstractEvent;
-import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
-import org.apache.flink.runtime.event.job.JobEvent;
-import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-
-
-public class JobmanagerInfoServlet extends HttpServlet {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(JobmanagerInfoServlet.class);
-       
-       /** Underlying JobManager */
-       private final JobManager jobmanager;
-       
-       
-       public JobmanagerInfoServlet(JobManager jobmanager) {
-               this.jobmanager = jobmanager;
-       }
-       
-       
-       @Override
-       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
-                       
-               resp.setStatus(HttpServletResponse.SC_OK);
-               resp.setContentType("application/json");
-               
-               try {
-                       if("archive".equals(req.getParameter("get"))) {
-                               writeJsonForArchive(resp.getWriter(), 
jobmanager.getOldJobs());
-                       }
-                       else if("job".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               writeJsonForArchivedJob(resp.getWriter(), 
jobmanager.getArchive().getJob(JobID.fromHexString(jobId)));
-                       }
-                       else if("groupvertex".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               String groupvertexId = 
req.getParameter("groupvertex");
-                               
writeJsonForArchivedJobGroupvertex(resp.getWriter(), 
jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), 
JobVertexID.fromHexString(groupvertexId));
-                       }
-                       else if("taskmanagers".equals(req.getParameter("get"))) 
{
-                               resp.getWriter().write("{\"taskmanagers\": " + 
jobmanager.getNumberOfTaskManagers() +", \"slots\": 
"+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
-                       }
-                       else if("cancel".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               
jobmanager.cancelJob(JobID.fromHexString(jobId));
-                       }
-                       else if("updates".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               writeJsonUpdatesForJob(resp.getWriter(), 
JobID.fromHexString(jobId));
-                       } else if ("version".equals(req.getParameter("get"))) {
-                               writeJsonForVersion(resp.getWriter());
-                       }
-                       else{
-                               writeJsonForJobs(resp.getWriter(), 
jobmanager.getRecentJobs());
-                       }
-                       
-               } catch (Exception e) {
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       resp.getWriter().print(e.getMessage());
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn(StringUtils.stringifyException(e));
-                       }
-               }
-       }
-       
-       /**
-        * Writes ManagementGraph as Json for all recent jobs
-        * 
-        * @param wrt
-        * @param jobs
-        */
-       private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> 
jobs) {
-               
-               try {
-               
-                       wrt.write("[");
-                       
-                       // Loop Jobs
-                       for (int i = 0; i < jobs.size(); i++) {
-                               RecentJobEvent jobEvent = jobs.get(i);
-       
-                               writeJsonForJob(wrt, jobEvent);
-       
-                               //Write seperator between json objects
-                               if(i != jobs.size() - 1) {
-                                       wrt.write(",");
-                               }
-                       }
-                       wrt.write("]");
-               
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-               
-       }
-       
-       private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) 
throws IOException {
-               
-               ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-               
-               //Serialize job to json
-               wrt.write("{");
-               wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
-               wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
-               wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
-               wrt.write("\"time\": " + jobEvent.getTimestamp()+",");
-               
-               // Serialize ManagementGraph to json
-               wrt.write("\"groupvertices\": [");
-               boolean first = true;
-               
-               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-                       //Write seperator between json objects
-                       if(first) {
-                               first = false;
-                       } else {
-                               wrt.write(","); }
-                       
-                       wrt.write(JsonFactory.toJson(groupVertex));
-               }
-               wrt.write("]");
-               wrt.write("}");
-                       
-       }
-       
-       /**
-        * Writes Json with a list of currently archived jobs, sorted by time
-        * 
-        * @param wrt
-        * @param jobs
-        */
-       private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> 
jobs) {
-               
-               wrt.write("[");
-               
-               // sort jobs by time
-               Collections.sort(jobs,  new Comparator<RecentJobEvent>() {
-                       @Override
-                       public int compare(RecentJobEvent o1, RecentJobEvent 
o2) {
-                               if(o1.getTimestamp() < o2.getTimestamp()) {
-                                       return 1;
-                               } else {
-                                       return -1;
-                               }
-                       }
-                       
-               });
-               
-               // Loop Jobs
-               for (int i = 0; i < jobs.size(); i++) {
-                       RecentJobEvent jobEvent = jobs.get(i);
-                       
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + jobEvent.getJobID() + 
"\",");
-                       wrt.write("\"jobname\": \"" + 
jobEvent.getJobName()+"\",");
-                       wrt.write("\"status\": \""+ jobEvent.getJobStatus() + 
"\",");
-                       wrt.write("\"time\": " + jobEvent.getTimestamp());
-                       
-                       wrt.write("}");
-                       
-                       //Write seperator between json objects
-                       if(i != jobs.size() - 1) {
-                               wrt.write(",");
-                       }
-               }
-               wrt.write("]");
-               
-       }
-       
-       /**
-        * Writes infos about archived job in Json format, including 
groupvertices and groupverticetimes
-        * 
-        * @param wrt
-        * @param jobEvent
-        */
-       private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent 
jobEvent) {
-               
-               try {
-               
-                       wrt.write("[");
-               
-                       ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-                       
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + jobEvent.getJobID() + 
"\",");
-                       wrt.write("\"jobname\": \"" + 
jobEvent.getJobName()+"\",");
-                       wrt.write("\"status\": \""+ jobEvent.getJobStatus() + 
"\",");
-                       wrt.write("\"SCHEDULED\": "+ 
graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-                       wrt.write("\"RUNNING\": "+ 
graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-                       wrt.write("\"FINISHED\": "+ 
graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-                       wrt.write("\"FAILED\": "+ 
graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-                       wrt.write("\"CANCELED\": "+ 
graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
-                       if (jobEvent.getJobStatus() == JobStatus.FAILED) {
-                               wrt.write("\"failednodes\": [");
-                               boolean first = true;
-                               for (ExecutionVertex vertex : 
graph.getAllExecutionVertices()) {
-                                       if (vertex.getExecutionState() == 
ExecutionState.FAILED) {
-                                               AllocatedSlot slot = 
vertex.getCurrentAssignedResource();
-                                               Throwable failureCause = 
vertex.getFailureCause();
-                                               if (slot != null || 
failureCause != null) {
-                                                       if (first) {
-                                                               first = false;
-                                                       } else {
-                                                               wrt.write(",");
-                                                       }
-                                                       wrt.write("{");
-                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : 
slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
-                                                       wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
-                                                       wrt.write("}");
-                                               }
-                                       }
-                               }
-                               wrt.write("],");
-                       }
-
-                       // Serialize ManagementGraph to json
-                       wrt.write("\"groupvertices\": [");
-                       boolean first = true;
-                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-                               //Write seperator between json objects
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       wrt.write(","); }
-                               
-                               wrt.write(JsonFactory.toJson(groupVertex));
-                               
-                       }
-                       wrt.write("],");
-                       
-                       // write accumulators
-                       Map<String, Object> accMap = 
AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID()));
-                       
-                       wrt.write("\n\"accumulators\": [");
-                       int i = 0;
-                       for( Entry<String, Object> accumulator : 
accMap.entrySet()) {
-                               wrt.write("{ \"name\": 
\""+accumulator.getKey()+" (" + 
accumulator.getValue().getClass().getName()+")\","
-                                               + " \"value\": 
\""+accumulator.getValue().toString()+"\"}\n");
-                               if(++i < accMap.size()) {
-                                       wrt.write(",");
-                               }
-                       }
-                       wrt.write("],\n");
-                       
-                       wrt.write("\"groupverticetimes\": {");
-                       first = true;
-                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-                               
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       wrt.write(","); }
-                               
-                               // Calculate start and end time for groupvertex
-                               long started = Long.MAX_VALUE;
-                               long ended = 0;
-                               
-                               // Take earliest running state and latest 
endstate of groupmembers
-                               for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
-                                       
-                                       long running = 
vertex.getStateTimestamp(ExecutionState.RUNNING);
-                                       if (running != 0 && running < started) {
-                                               started = running;
-                                       }
-                                       
-                                       long finished = 
vertex.getStateTimestamp(ExecutionState.FINISHED);
-                                       long canceled = 
vertex.getStateTimestamp(ExecutionState.CANCELED);
-                                       long failed = 
vertex.getStateTimestamp(ExecutionState.FAILED);
-                                       
-                                       if(finished != 0 && finished > ended) {
-                                               ended = finished;
-                                       }
-                                       
-                                       if(canceled != 0 && canceled > ended) {
-                                               ended = canceled;
-                                       }
-                                       
-                                       if(failed != 0 && failed > ended) {
-                                               ended = failed;
-                                       }
-
-                               }
-                               
-                               
wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
-                               wrt.write("\"groupvertexid\": \"" + 
groupVertex.getJobVertexId() + "\",");
-                               wrt.write("\"groupvertexname\": \"" + 
groupVertex + "\",");
-                               wrt.write("\"STARTED\": "+ started + ",");
-                               wrt.write("\"ENDED\": "+ ended);
-                               wrt.write("}");
-                               
-                       }
-
-                       wrt.write("}");
-                       
-                       wrt.write("}");
-                       
-                       
-               wrt.write("]");
-               
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-               
-       }
-       
-       
-       /**
-        * Writes all updates (events) for a given job since a given time
-        * 
-        * @param wrt
-        * @param jobId
-        */
-       private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-               
-               try {
-                       
-                       List<AbstractEvent> events = 
jobmanager.getEvents(jobId);
-                       
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + jobId + "\",");
-                       wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\",");
-                       wrt.write("\"recentjobs\": [");
-                               
-                       boolean first = true;
-                       for(RecentJobEvent rje: jobmanager.getRecentJobs()) {
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       wrt.write(","); }
-                               
-                               wrt.write("\""+rje.getJobID().toString()+"\""); 
-                       }
-                                       
-                       wrt.write("],");
-                       
-                       wrt.write("\"vertexevents\": [");
-               
-                       first = true;
-                       for (AbstractEvent event: events) {
-                               
-                               if (event instanceof ExecutionStateChangeEvent) 
{
-                                       
-                                       if(first) {
-                                               first = false;
-                                       } else {
-                                               wrt.write(","); }
-                                       
-                                       ExecutionStateChangeEvent vertexevent = 
(ExecutionStateChangeEvent) event;
-                                       wrt.write("{");
-                                       wrt.write("\"vertexid\": \"" + 
vertexevent.getExecutionAttemptID() + "\",");
-                                       wrt.write("\"newstate\": \"" + 
vertexevent.getNewExecutionState() + "\",");
-                                       wrt.write("\"timestamp\": \"" + 
vertexevent.getTimestamp() + "\"");
-                                       wrt.write("}");
-                               }
-                       }
-                       
-                       wrt.write("],");
-                       
-                       wrt.write("\"jobevents\": [");
-                       
-                       first = true;
-                       for(AbstractEvent event: events) {
-                               
-                               if( event instanceof JobEvent) {
-                                       
-                                       if(first) {
-                                               first = false;
-                                       } else {
-                                               wrt.write(","); }
-                                       
-                                       JobEvent jobevent = (JobEvent) event;
-                                       wrt.write("{");
-                                       wrt.write("\"newstate\": \"" + 
jobevent.getCurrentJobStatus() + "\",");
-                                       wrt.write("\"timestamp\": \"" + 
jobevent.getTimestamp() + "\"");
-                                       wrt.write("}");
-                               }
-                       }
-                       
-                       wrt.write("]");
-                       
-                       wrt.write("}");
-                       
-               
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-               
-       }
-       
-       /**
-        * Writes info about one particular archived JobVertex in a job, 
including all member execution vertices, their times and statuses.
-        */
-       private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, 
RecentJobEvent jobEvent, JobVertexID vertexId) {
-               try {
-                       ExecutionGraph graph = 
jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-                       
-                       ExecutionJobVertex jobVertex = 
graph.getJobVertex(vertexId);
-                       
-                       // Serialize ManagementGraph to json
-                       wrt.write("{\"groupvertex\": " + 
JsonFactory.toJson(jobVertex) + ",");
-                       
-                       wrt.write("\"verticetimes\": {");
-                       boolean first = true;
-                       for (ExecutionJobVertex groupVertex : 
graph.getAllVertices().values()) {
-                               
-                               for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
-                                       
-                                       Execution exec = 
vertex.getCurrentExecutionAttempt();
-                                       
-                                       if(first) {
-                                               first = false;
-                                       } else {
-                                               wrt.write(","); }
-                                       
-                                       wrt.write("\""+exec.getAttemptId() 
+"\": {");
-                                       wrt.write("\"vertexid\": \"" + 
exec.getAttemptId() + "\",");
-                                       wrt.write("\"vertexname\": \"" + vertex 
+ "\",");
-                                       wrt.write("\"CREATED\": "+ 
vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
-                                       wrt.write("\"SCHEDULED\": "+ 
vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-                                       wrt.write("\"DEPLOYING\": "+ 
vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
-                                       wrt.write("\"RUNNING\": "+ 
vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
-                                       wrt.write("\"FINISHED\": "+ 
vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
-                                       wrt.write("\"CANCELING\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
-                                       wrt.write("\"CANCELED\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
-                                       wrt.write("\"FAILED\": "+ 
vertex.getStateTimestamp(ExecutionState.FAILED) + "");
-                                       wrt.write("}");
-                               }
-                               
-                       }
-                       wrt.write("}}");
-                       
-               }
-               catch (IOException ioe) { // Connection closed by client
-                       String message = "Info server for jobmanager: 
Connection closed by client - " + ioe.getClass().getSimpleName();
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug(message, ioe);
-                       }
-                       else if (LOG.isInfoEnabled()) {
-                               LOG.info(message);
-                       }
-               } 
-       }
-       
-       /**
-        * Writes the version and the revision of Flink.
-        * 
-        * @param wrt
-        */
-       private void writeJsonForVersion(PrintWriter wrt) {
-               wrt.write("{");
-               wrt.write("\"version\": \"" + 
EnvironmentInformation.getVersion() + "\",");
-               wrt.write("\"revision\": \"" + 
EnvironmentInformation.getRevisionInformation().commitId + "\"");
-               wrt.write("}");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 1469a4c..0447b00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.util.StringUtils;
 
@@ -39,7 +39,7 @@ public class JsonFactory {
                json.append("\"vertexname\": \"" + 
StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
                json.append("\"vertexstatus\": \"" + vertex.getExecutionState() 
+ "\",");
                
-               AllocatedSlot slot = vertex.getCurrentAssignedResource();
+               SimpleSlot slot = vertex.getCurrentAssignedResource();
                String instanceName = slot == null ? "(null)" : 
slot.getInstance().getInstanceConnectionInfo().getFQDNHostname();
                
                json.append("\"vertexinstancename\": \"" + instanceName + "\"");

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 9766ab6..31800e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -124,7 +124,7 @@ public class WebInfoServer {
                // ----- the handlers for the servlets -----
                ServletContextHandler servletContext = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
                servletContext.setContextPath("/");
-               servletContext.addServlet(new ServletHolder(new 
JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
+               servletContext.addServlet(new ServletHolder(new 
JobManagerInfoServlet(jobmanager)), "/jobsInfo");
                servletContext.addServlet(new ServletHolder(new 
LogfileInfoServlet(logDirFiles)), "/logInfo");
                servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(jobmanager)), "/setupInfo");
                servletContext.addServlet(new ServletHolder(new MenuServlet()), 
"/menu");

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
index 241d6cd..f9126fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
@@ -26,13 +26,12 @@ import java.util.Set;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import 
org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
 import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
 
-
 public class JobProfilingData {
 
        private final ExecutionGraph executionGraph;
@@ -59,7 +58,7 @@ public class JobProfilingData {
        public boolean 
addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData 
instanceProfilingData) {
 
                for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       AllocatedSlot slot = 
executionVertex.getCurrentAssignedResource();
+                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
                        if (slot != null && 
slot.getInstance().getInstanceConnectionInfo().equals(
                                        
instanceProfilingData.getInstanceConnectionInfo()))
                        {
@@ -76,7 +75,7 @@ public class JobProfilingData {
                final Set<Instance> tempSet = new HashSet<Instance>();
                
                for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       AllocatedSlot slot = 
executionVertex.getCurrentAssignedResource();
+                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
                        if (slot != null) {
                                tempSet.add(slot.getInstance());
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1c23293..e6e5287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -423,7 +423,7 @@ public class TaskManager implements TaskOperationProtocol {
                LOG.info("Shutting down TaskManager");
                
                cancelAndClearEverything(new Exception("Task Manager is 
shutting down"));
-               
+
                // first, stop the heartbeat thread and wait for it to terminate
                this.heartbeatThread.interrupt();
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 14ce15a..cf63e43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,8 +38,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -117,7 +117,7 @@ public class ExecutionGraphDeploymentTest {
                        });
                        
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(jobId);
+                       final SimpleSlot slot = 
instance.allocateSimpleSlot(jobId);
                        
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 30b05ee..c685ec0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -32,11 +32,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -67,7 +67,7 @@ public class ExecutionGraphTestUtils {
                }
        }
        
-       public static void setVertexResource(ExecutionVertex vertex, 
AllocatedSlot slot) {
+       public static void setVertexResource(ExecutionVertex vertex, SimpleSlot 
slot) {
                try {
                        Execution exec = vertex.getCurrentExecutionAttempt();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 2848466..f0c9342 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -56,7 +56,7 @@ public class ExecutionStateProgressTest {
                        // mock resources and mock taskmanager
                        TaskOperationProtocol taskManager = 
getSimpleAcknowledgingTaskmanager();
                        for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               AllocatedSlot slot = 
getInstance(taskManager).allocateSlot(jid);
+                               SimpleSlot slot = 
getInstance(taskManager).allocateSimpleSlot(jid);
                                ee.deployToSlot(slot);
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9769529..ff1ef5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenReturn(new 
TaskOperationResult(execId, true), new TaskOperationResult(execId, false));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        vertex.deployToSlot(slot);
                        
@@ -193,7 +193,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenReturn(new 
TaskOperationResult(execId, false), new TaskOperationResult(execId, true));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        vertex.deployToSlot(slot);
                        
@@ -260,7 +260,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenReturn(new 
TaskOperationResult(execId, true));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
@@ -301,7 +301,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenReturn(new 
TaskOperationResult(execId, true));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
@@ -353,7 +353,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenReturn(new 
TaskOperationResult(execId, false));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
@@ -389,7 +389,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenThrow(new 
IOException("RPC call failed"));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
@@ -428,7 +428,7 @@ public class ExecutionVertexCancelTest {
                        when(taskManager.cancelTask(execId)).thenThrow(new 
IOException("RPC call failed"));
                        
                        Instance instance = getInstance(taskManager);
-                       AllocatedSlot slot = instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
@@ -484,7 +484,7 @@ public class ExecutionVertexCancelTest {
                        try {
                                TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                                Instance instance = getInstance(taskManager);
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -526,7 +526,7 @@ public class ExecutionVertexCancelTest {
                                
                                TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                                Instance instance = getInstance(taskManager);
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -540,7 +540,7 @@ public class ExecutionVertexCancelTest {
                                
                                TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                                Instance instance = getInstance(taskManager);
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                setVertexResource(vertex, slot);
                                setVertexState(vertex, 
ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index f3081bc..5c7d58c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -30,8 +30,8 @@ import static org.mockito.Matchers.any;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
@@ -52,7 +52,7 @@ public class ExecutionVertexDeploymentTest {
                        TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexNotExecuting(jid);
                        
@@ -88,7 +88,7 @@ public class ExecutionVertexDeploymentTest {
                        // mock taskmanager to simply accept the call
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexExecutingSynchronously(jid);
                        
@@ -133,7 +133,7 @@ public class ExecutionVertexDeploymentTest {
                        
                        
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexExecutingAsynchronously(jid);
                        
@@ -189,7 +189,7 @@ public class ExecutionVertexDeploymentTest {
                        // mock taskmanager to simply accept the call
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexExecutingSynchronously(jid);
                        
@@ -225,7 +225,7 @@ public class ExecutionVertexDeploymentTest {
                        // mock taskmanager to simply accept the call
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexExecutingAsynchronously(jid);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0]);
@@ -269,7 +269,7 @@ public class ExecutionVertexDeploymentTest {
                        TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexNotExecuting(jid);
                        
@@ -306,7 +306,7 @@ public class ExecutionVertexDeploymentTest {
                        TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexExecutingTriggered(jid, queue);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index e2f0ee8..e1e4549 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -25,8 +25,8 @@ import static org.mockito.Mockito.*;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
                        // a slot than cannot be deployed to
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        slot.cancel();
                        assertFalse(slot.isReleased());
                        
@@ -80,7 +80,7 @@ public class ExecutionVertexSchedulingTest {
                        // a slot than cannot be deployed to
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        slot.cancel();
                        assertFalse(slot.isReleased());
                        
@@ -119,7 +119,7 @@ public class ExecutionVertexSchedulingTest {
                        // a slot than cannot be deployed to
                        final TaskOperationProtocol taskManager = 
mock(TaskOperationProtocol.class);
                        final Instance instance = getInstance(taskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = 
getJobVertexNotExecuting(new JobVertexID());
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
index a9af2cf..a321ad8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -35,7 +35,7 @@ public class AllocatedSlotTest {
                try {
                        // cancel, then release
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                assertTrue(slot.isAlive());
                                
                                slot.cancel();
@@ -51,7 +51,7 @@ public class AllocatedSlotTest {
                        
                        // release immediately
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                assertTrue(slot.isAlive());
                                
                                slot.releaseSlot();
@@ -74,32 +74,32 @@ public class AllocatedSlotTest {
                        
                        // assign to alive slot
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                
                                assertTrue(slot.setExecutedVertex(ev));
-                               assertEquals(ev, slot.getExecutedVertex());
+                               assertEquals(ev, slot.getExecution());
                                
                                // try to add another one
                                assertFalse(slot.setExecutedVertex(ev_2));
-                               assertEquals(ev, slot.getExecutedVertex());
+                               assertEquals(ev, slot.getExecution());
                        }
                        
                        // assign to canceled slot
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                slot.cancel();
                                
                                assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecutedVertex());
+                               assertNull(slot.getExecution());
                        }
                        
                        // assign to released
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                slot.releaseSlot();
                                
                                assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecutedVertex());
+                               assertNull(slot.getExecution());
                        }
                }
                catch (Exception e) {
@@ -113,9 +113,9 @@ public class AllocatedSlotTest {
                try {
                        Execution ev = mock(Execution.class);
                        
-                       AllocatedSlot slot = getSlot();
+                       SimpleSlot slot = getSlot();
                        assertTrue(slot.setExecutedVertex(ev));
-                       assertEquals(ev, slot.getExecutedVertex());
+                       assertEquals(ev, slot.getExecution());
                        
                        slot.cancel();
                        slot.releaseSlot();
@@ -129,12 +129,12 @@ public class AllocatedSlotTest {
                }
        }
        
-       public static AllocatedSlot getSlot() throws Exception {
+       public static SimpleSlot getSlot() throws Exception {
                HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10000, 10001);
                
                Instance instance = new Instance(connection, new InstanceID(), 
hardwareDescription, 1);
-               return instance.allocateSlot(new JobID());
+               return instance.allocateSimpleSlot(new JobID());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 3a80357..5e008b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -44,10 +44,10 @@ public class InstanceTest {
                        assertEquals(4, instance.getNumberOfAvailableSlots());
                        assertEquals(0, instance.getNumberOfAllocatedSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot4 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot4 = instance.allocateSimpleSlot(new 
JobID());
                        
                        assertNotNull(slot1);
                        assertNotNull(slot2);
@@ -60,7 +60,7 @@ public class InstanceTest {
                                        slot3.getSlotNumber() + 
slot4.getSlotNumber());
                        
                        // no more slots
-                       assertNull(instance.allocateSlot(new JobID()));
+                       assertNull(instance.allocateSimpleSlot(new JobID()));
                        try {
                                instance.returnAllocatedSlot(slot2);
                                fail("instance accepted a non-cancelled slot.");
@@ -108,9 +108,9 @@ public class InstanceTest {
                        
                        assertEquals(3, instance.getNumberOfAvailableSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
                        
                        instance.markDead();
                        
@@ -138,9 +138,9 @@ public class InstanceTest {
                        
                        assertEquals(3, instance.getNumberOfAvailableSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
                        
                        instance.cancelAndReleaseAllSlots();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index cc767c2..5e93cbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -103,4 +103,20 @@ public class JobManagerTestUtils {
                        }
                }
        }
+
+       public static void killRandomHeartbeatThread() throws 
InterruptedException {
+               Thread[] threads = new Thread[Thread.activeCount()];
+               Thread.enumerate(threads);
+
+               for (Thread t : threads) {
+                       if (t == null) {
+                               continue;
+                       }
+                       if (t.getName().equals("Heartbeat Thread")) {
+                               t.stop();
+                               t.join();
+                               return;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 6b8be15..888da9c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread;
 import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
 import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
 import static org.junit.Assert.*;
@@ -112,4 +113,65 @@ public class TaskManagerFailsITCase {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testExecutionWithFailingHeartbeat() {
+               final int NUM_TASKS = 31;
+
+               try {
+                       final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
+                       final AbstractJobVertex receiver = new 
AbstractJobVertex("Receiver");
+                       sender.setInvokableClass(Sender.class);
+                       receiver.setInvokableClass(BlockingReceiver.class);
+                       sender.setParallelism(NUM_TASKS);
+                       receiver.setParallelism(NUM_TASKS);
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       final JobManager jm = startJobManager(2, NUM_TASKS);
+
+                       try {
+
+                               JobSubmissionResult result = 
jm.submitJob(jobGraph);
+
+                               if (result.getReturnCode() != 
AbstractJobResult.ReturnCode.SUCCESS) {
+                                       
System.out.println(result.getDescription());
+                               }
+                               
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+                               ExecutionGraph eg = 
jm.getCurrentJobs().get(jobGraph.getJobID());
+
+                               // wait until everyone has settled in
+                               long deadline = System.currentTimeMillis() + 
2000;
+                               while (System.currentTimeMillis() < deadline) {
+
+                                       boolean allrunning = true;
+                                       for (ExecutionVertex v : 
eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+                                               if 
(v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+                                                       allrunning = false;
+                                                       break;
+                                               }
+                                       }
+
+                                       if (allrunning) {
+                                               break;
+                                       }
+                                       Thread.sleep(200);
+                               }
+
+                               // kill one task manager's heartbeat
+                               killRandomHeartbeatThread();
+
+                               eg.waitForJobEnd();
+                       }
+                       finally {
+                               jm.shutdown();
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
index 239ab9d..5680f47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
 import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static 
org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread;
 import static org.junit.Assert.*;
 
 import org.apache.flink.runtime.client.AbstractJobResult;
@@ -115,4 +116,67 @@ public class TaskManagerFailsWithSlotSharingITCase {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testExecutionWithFailingHeartbeat() {
+               final int NUM_TASKS = 20;
+
+               try {
+                       final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
+                       final AbstractJobVertex receiver = new 
AbstractJobVertex("Receiver");
+                       sender.setInvokableClass(Sender.class);
+                       receiver.setInvokableClass(BlockingReceiver.class);
+                       sender.setParallelism(NUM_TASKS);
+                       receiver.setParallelism(NUM_TASKS);
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
+
+                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
+                       sender.setSlotSharingGroup(sharingGroup);
+                       receiver.setSlotSharingGroup(sharingGroup);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       final JobManager jm = startJobManager(2, NUM_TASKS / 2);
+
+                       try {
+                               JobSubmissionResult result = 
jm.submitJob(jobGraph);
+
+                               if (result.getReturnCode() != 
AbstractJobResult.ReturnCode.SUCCESS) {
+                                       
System.out.println(result.getDescription());
+                               }
+                               
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+                               ExecutionGraph eg = 
jm.getCurrentJobs().get(jobGraph.getJobID());
+
+                               // wait until everyone has settled in
+                               long deadline = System.currentTimeMillis() + 
2000;
+                               while (System.currentTimeMillis() < deadline) {
+
+                                       boolean allrunning = true;
+                                       for (ExecutionVertex v : 
eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+                                               if 
(v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+                                                       allrunning = false;
+                                                       break;
+                                               }
+                                       }
+
+                                       if (allrunning) {
+                                               break;
+                                       }
+                                       Thread.sleep(200);
+                               }
+
+                               killRandomHeartbeatThread();
+
+                               eg.waitForJobEnd();
+                       }
+                       finally {
+                               jm.shutdown();
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 }

Reply via email to