http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java deleted file mode 100644 index 0b19a5f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.runtime.AbstractID; -import org.apache.flink.runtime.instance.AllocatedSlot; - -public class SubSlot extends AllocatedSlot { - - private final SharedSlot sharedSlot; - - private final AbstractID groupId; - - private final int subSlotNumber; - - - public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) { - super(sharedSlot.getAllocatedSlot().getJobID(), - sharedSlot.getAllocatedSlot().getInstance(), - sharedSlot.getAllocatedSlot().getSlotNumber()); - - this.sharedSlot = sharedSlot; - this.groupId = groupId; - this.subSlotNumber = subSlotNumber; - } - - // -------------------------------------------------------------------------------------------- - - public void releaseSlot() { - // cancel everything, if there is something. since this is atomically status based, - // it will not happen twice if another attempt happened before or concurrently - try { - cancel(); - } - finally { - if (markReleased()) { - this.sharedSlot.returnAllocatedSlot(this); - } - } - } - - public SharedSlot getSharedSlot() { - return this.sharedSlot; - } - - public AbstractID getGroupId() { - return groupId; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return "SubSlot " + subSlotNumber + " (" + super.toString() + ')'; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java new file mode 100644 index 0000000..10796b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.web; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.flink.runtime.instance.SimpleSlot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.runtime.event.job.AbstractEvent; +import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent; +import org.apache.flink.runtime.event.job.JobEvent; +import org.apache.flink.runtime.event.job.RecentJobEvent; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.StringUtils; +import org.eclipse.jetty.io.EofException; + +public class JobManagerInfoServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class); + + /** Underlying JobManager */ + private final JobManager jobmanager; + + + public JobManagerInfoServlet(JobManager jobmanager) { + this.jobmanager = jobmanager; + } + + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("application/json"); + + try { + if("archive".equals(req.getParameter("get"))) { + writeJsonForArchive(resp.getWriter(), jobmanager.getOldJobs()); + } + else if("job".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + writeJsonForArchivedJob(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId))); + } + else if("groupvertex".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + String groupvertexId = req.getParameter("groupvertex"); + writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId)); + } + else if("taskmanagers".equals(req.getParameter("get"))) { + resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}"); + } + else if("cancel".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + jobmanager.cancelJob(JobID.fromHexString(jobId)); + } + else if("updates".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId)); + } else if ("version".equals(req.getParameter("get"))) { + writeJsonForVersion(resp.getWriter()); + } + else{ + writeJsonForJobs(resp.getWriter(), jobmanager.getRecentJobs()); + } + + } catch (Exception e) { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + resp.getWriter().print(e.getMessage()); + if (LOG.isWarnEnabled()) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + /** + * Writes ManagementGraph as Json for all recent jobs + * + * @param wrt + * @param jobs + */ + private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> jobs) { + + try { + + wrt.write("["); + + // Loop Jobs + for (int i = 0; i < jobs.size(); i++) { + RecentJobEvent jobEvent = jobs.get(i); + + writeJsonForJob(wrt, jobEvent); + + //Write seperator between json objects + if(i != jobs.size() - 1) { + wrt.write(","); + } + } + wrt.write("]"); + + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + + } + + private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) throws IOException { + + ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); + wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); + wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); + wrt.write("\"time\": " + jobEvent.getTimestamp()+","); + + // Serialize ManagementGraph to json + wrt.write("\"groupvertices\": ["); + boolean first = true; + + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write(JsonFactory.toJson(groupVertex)); + } + wrt.write("]"); + wrt.write("}"); + + } + + /** + * Writes Json with a list of currently archived jobs, sorted by time + * + * @param wrt + * @param jobs + */ + private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> jobs) { + + wrt.write("["); + + // sort jobs by time + Collections.sort(jobs, new Comparator<RecentJobEvent>() { + @Override + public int compare(RecentJobEvent o1, RecentJobEvent o2) { + if(o1.getTimestamp() < o2.getTimestamp()) { + return 1; + } else { + return -1; + } + } + + }); + + // Loop Jobs + for (int i = 0; i < jobs.size(); i++) { + RecentJobEvent jobEvent = jobs.get(i); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); + wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); + wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); + wrt.write("\"time\": " + jobEvent.getTimestamp()); + + wrt.write("}"); + + //Write seperator between json objects + if(i != jobs.size() - 1) { + wrt.write(","); + } + } + wrt.write("]"); + + } + + /** + * Writes infos about archived job in Json format, including groupvertices and groupverticetimes + * + * @param wrt + * @param jobEvent + */ + private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent jobEvent) { + + try { + + wrt.write("["); + + ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); + wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); + wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); + wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ","); + wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ","); + wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ","); + wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ","); + wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ","); + + if (jobEvent.getJobStatus() == JobStatus.FAILED) { + wrt.write("\"failednodes\": ["); + boolean first = true; + for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { + if (vertex.getExecutionState() == ExecutionState.FAILED) { + SimpleSlot slot = vertex.getCurrentAssignedResource(); + Throwable failureCause = vertex.getFailureCause(); + if (slot != null || failureCause != null) { + if (first) { + first = false; + } else { + wrt.write(","); + } + wrt.write("{"); + wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); + wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); + wrt.write("}"); + } + } + } + wrt.write("],"); + } + + // Serialize ManagementGraph to json + wrt.write("\"groupvertices\": ["); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write(JsonFactory.toJson(groupVertex)); + + } + wrt.write("],"); + + // write accumulators + Map<String, Object> accMap = AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID())); + + wrt.write("\n\"accumulators\": ["); + int i = 0; + for( Entry<String, Object> accumulator : accMap.entrySet()) { + wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\"," + + " \"value\": \""+accumulator.getValue().toString()+"\"}\n"); + if(++i < accMap.size()) { + wrt.write(","); + } + } + wrt.write("],\n"); + + wrt.write("\"groupverticetimes\": {"); + first = true; + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + + if(first) { + first = false; + } else { + wrt.write(","); } + + // Calculate start and end time for groupvertex + long started = Long.MAX_VALUE; + long ended = 0; + + // Take earliest running state and latest endstate of groupmembers + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + long running = vertex.getStateTimestamp(ExecutionState.RUNNING); + if (running != 0 && running < started) { + started = running; + } + + long finished = vertex.getStateTimestamp(ExecutionState.FINISHED); + long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED); + long failed = vertex.getStateTimestamp(ExecutionState.FAILED); + + if(finished != 0 && finished > ended) { + ended = finished; + } + + if(canceled != 0 && canceled > ended) { + ended = canceled; + } + + if(failed != 0 && failed > ended) { + ended = failed; + } + + } + + wrt.write("\""+groupVertex.getJobVertexId()+"\": {"); + wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\","); + wrt.write("\"groupvertexname\": \"" + groupVertex + "\","); + wrt.write("\"STARTED\": "+ started + ","); + wrt.write("\"ENDED\": "+ ended); + wrt.write("}"); + + } + + wrt.write("}"); + + wrt.write("}"); + + + wrt.write("]"); + + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + + } + + + /** + * Writes all updates (events) for a given job since a given time + * + * @param wrt + * @param jobId + */ + private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) { + + try { + + List<AbstractEvent> events = jobmanager.getEvents(jobId); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + jobId + "\","); + wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\","); + wrt.write("\"recentjobs\": ["); + + boolean first = true; + for(RecentJobEvent rje: jobmanager.getRecentJobs()) { + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write("\""+rje.getJobID().toString()+"\""); + } + + wrt.write("],"); + + wrt.write("\"vertexevents\": ["); + + first = true; + for (AbstractEvent event: events) { + + if (event instanceof ExecutionStateChangeEvent) { + + if(first) { + first = false; + } else { + wrt.write(","); } + + ExecutionStateChangeEvent vertexevent = (ExecutionStateChangeEvent) event; + wrt.write("{"); + wrt.write("\"vertexid\": \"" + vertexevent.getExecutionAttemptID() + "\","); + wrt.write("\"newstate\": \"" + vertexevent.getNewExecutionState() + "\","); + wrt.write("\"timestamp\": \"" + vertexevent.getTimestamp() + "\""); + wrt.write("}"); + } + } + + wrt.write("],"); + + wrt.write("\"jobevents\": ["); + + first = true; + for(AbstractEvent event: events) { + + if( event instanceof JobEvent) { + + if(first) { + first = false; + } else { + wrt.write(","); } + + JobEvent jobevent = (JobEvent) event; + wrt.write("{"); + wrt.write("\"newstate\": \"" + jobevent.getCurrentJobStatus() + "\","); + wrt.write("\"timestamp\": \"" + jobevent.getTimestamp() + "\""); + wrt.write("}"); + } + } + + wrt.write("]"); + + wrt.write("}"); + + + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + + } + + /** + * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses. + */ + private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, JobVertexID vertexId) { + try { + ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); + + ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId); + + // Serialize ManagementGraph to json + wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ","); + + wrt.write("\"verticetimes\": {"); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) { + + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + Execution exec = vertex.getCurrentExecutionAttempt(); + + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write("\""+exec.getAttemptId() +"\": {"); + wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\","); + wrt.write("\"vertexname\": \"" + vertex + "\","); + wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ","); + wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ","); + wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ","); + wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ","); + wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ","); + wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ","); + wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ","); + wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + ""); + wrt.write("}"); + } + + } + wrt.write("}}"); + + } + catch (IOException ioe) { // Connection closed by client + String message = "Info server for jobmanager: Connection closed by client - " + ioe.getClass().getSimpleName(); + + if (LOG.isDebugEnabled()) { + LOG.debug(message, ioe); + } + else if (LOG.isInfoEnabled()) { + LOG.info(message); + } + } + } + + /** + * Writes the version and the revision of Flink. + * + * @param wrt + */ + private void writeJsonForVersion(PrintWriter wrt) { + wrt.write("{"); + wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\","); + wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\""); + wrt.write("}"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java deleted file mode 100644 index ef5a246..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.web; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent; -import org.apache.flink.runtime.event.job.JobEvent; -import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.StringUtils; -import org.eclipse.jetty.io.EofException; - - -public class JobmanagerInfoServlet extends HttpServlet { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class); - - /** Underlying JobManager */ - private final JobManager jobmanager; - - - public JobmanagerInfoServlet(JobManager jobmanager) { - this.jobmanager = jobmanager; - } - - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - - resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("application/json"); - - try { - if("archive".equals(req.getParameter("get"))) { - writeJsonForArchive(resp.getWriter(), jobmanager.getOldJobs()); - } - else if("job".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - writeJsonForArchivedJob(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId))); - } - else if("groupvertex".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - String groupvertexId = req.getParameter("groupvertex"); - writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId)); - } - else if("taskmanagers".equals(req.getParameter("get"))) { - resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}"); - } - else if("cancel".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - jobmanager.cancelJob(JobID.fromHexString(jobId)); - } - else if("updates".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId)); - } else if ("version".equals(req.getParameter("get"))) { - writeJsonForVersion(resp.getWriter()); - } - else{ - writeJsonForJobs(resp.getWriter(), jobmanager.getRecentJobs()); - } - - } catch (Exception e) { - resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); - resp.getWriter().print(e.getMessage()); - if (LOG.isWarnEnabled()) { - LOG.warn(StringUtils.stringifyException(e)); - } - } - } - - /** - * Writes ManagementGraph as Json for all recent jobs - * - * @param wrt - * @param jobs - */ - private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> jobs) { - - try { - - wrt.write("["); - - // Loop Jobs - for (int i = 0; i < jobs.size(); i++) { - RecentJobEvent jobEvent = jobs.get(i); - - writeJsonForJob(wrt, jobEvent); - - //Write seperator between json objects - if(i != jobs.size() - 1) { - wrt.write(","); - } - } - wrt.write("]"); - - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - - } - - private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) throws IOException { - - ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); - wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); - wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); - wrt.write("\"time\": " + jobEvent.getTimestamp()+","); - - // Serialize ManagementGraph to json - wrt.write("\"groupvertices\": ["); - boolean first = true; - - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - //Write seperator between json objects - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write(JsonFactory.toJson(groupVertex)); - } - wrt.write("]"); - wrt.write("}"); - - } - - /** - * Writes Json with a list of currently archived jobs, sorted by time - * - * @param wrt - * @param jobs - */ - private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> jobs) { - - wrt.write("["); - - // sort jobs by time - Collections.sort(jobs, new Comparator<RecentJobEvent>() { - @Override - public int compare(RecentJobEvent o1, RecentJobEvent o2) { - if(o1.getTimestamp() < o2.getTimestamp()) { - return 1; - } else { - return -1; - } - } - - }); - - // Loop Jobs - for (int i = 0; i < jobs.size(); i++) { - RecentJobEvent jobEvent = jobs.get(i); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); - wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); - wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); - wrt.write("\"time\": " + jobEvent.getTimestamp()); - - wrt.write("}"); - - //Write seperator between json objects - if(i != jobs.size() - 1) { - wrt.write(","); - } - } - wrt.write("]"); - - } - - /** - * Writes infos about archived job in Json format, including groupvertices and groupverticetimes - * - * @param wrt - * @param jobEvent - */ - private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent jobEvent) { - - try { - - wrt.write("["); - - ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\","); - wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\","); - wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\","); - wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ","); - wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ","); - wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ","); - wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ","); - wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ","); - - if (jobEvent.getJobStatus() == JobStatus.FAILED) { - wrt.write("\"failednodes\": ["); - boolean first = true; - for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { - if (vertex.getExecutionState() == ExecutionState.FAILED) { - AllocatedSlot slot = vertex.getCurrentAssignedResource(); - Throwable failureCause = vertex.getFailureCause(); - if (slot != null || failureCause != null) { - if (first) { - first = false; - } else { - wrt.write(","); - } - wrt.write("{"); - wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); - wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); - wrt.write("}"); - } - } - } - wrt.write("],"); - } - - // Serialize ManagementGraph to json - wrt.write("\"groupvertices\": ["); - boolean first = true; - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - //Write seperator between json objects - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write(JsonFactory.toJson(groupVertex)); - - } - wrt.write("],"); - - // write accumulators - Map<String, Object> accMap = AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID())); - - wrt.write("\n\"accumulators\": ["); - int i = 0; - for( Entry<String, Object> accumulator : accMap.entrySet()) { - wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\"," - + " \"value\": \""+accumulator.getValue().toString()+"\"}\n"); - if(++i < accMap.size()) { - wrt.write(","); - } - } - wrt.write("],\n"); - - wrt.write("\"groupverticetimes\": {"); - first = true; - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - - if(first) { - first = false; - } else { - wrt.write(","); } - - // Calculate start and end time for groupvertex - long started = Long.MAX_VALUE; - long ended = 0; - - // Take earliest running state and latest endstate of groupmembers - for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { - - long running = vertex.getStateTimestamp(ExecutionState.RUNNING); - if (running != 0 && running < started) { - started = running; - } - - long finished = vertex.getStateTimestamp(ExecutionState.FINISHED); - long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED); - long failed = vertex.getStateTimestamp(ExecutionState.FAILED); - - if(finished != 0 && finished > ended) { - ended = finished; - } - - if(canceled != 0 && canceled > ended) { - ended = canceled; - } - - if(failed != 0 && failed > ended) { - ended = failed; - } - - } - - wrt.write("\""+groupVertex.getJobVertexId()+"\": {"); - wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\","); - wrt.write("\"groupvertexname\": \"" + groupVertex + "\","); - wrt.write("\"STARTED\": "+ started + ","); - wrt.write("\"ENDED\": "+ ended); - wrt.write("}"); - - } - - wrt.write("}"); - - wrt.write("}"); - - - wrt.write("]"); - - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - - } - - - /** - * Writes all updates (events) for a given job since a given time - * - * @param wrt - * @param jobId - */ - private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) { - - try { - - List<AbstractEvent> events = jobmanager.getEvents(jobId); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + jobId + "\","); - wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\","); - wrt.write("\"recentjobs\": ["); - - boolean first = true; - for(RecentJobEvent rje: jobmanager.getRecentJobs()) { - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write("\""+rje.getJobID().toString()+"\""); - } - - wrt.write("],"); - - wrt.write("\"vertexevents\": ["); - - first = true; - for (AbstractEvent event: events) { - - if (event instanceof ExecutionStateChangeEvent) { - - if(first) { - first = false; - } else { - wrt.write(","); } - - ExecutionStateChangeEvent vertexevent = (ExecutionStateChangeEvent) event; - wrt.write("{"); - wrt.write("\"vertexid\": \"" + vertexevent.getExecutionAttemptID() + "\","); - wrt.write("\"newstate\": \"" + vertexevent.getNewExecutionState() + "\","); - wrt.write("\"timestamp\": \"" + vertexevent.getTimestamp() + "\""); - wrt.write("}"); - } - } - - wrt.write("],"); - - wrt.write("\"jobevents\": ["); - - first = true; - for(AbstractEvent event: events) { - - if( event instanceof JobEvent) { - - if(first) { - first = false; - } else { - wrt.write(","); } - - JobEvent jobevent = (JobEvent) event; - wrt.write("{"); - wrt.write("\"newstate\": \"" + jobevent.getCurrentJobStatus() + "\","); - wrt.write("\"timestamp\": \"" + jobevent.getTimestamp() + "\""); - wrt.write("}"); - } - } - - wrt.write("]"); - - wrt.write("}"); - - - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - - } - - /** - * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses. - */ - private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, JobVertexID vertexId) { - try { - ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID()); - - ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId); - - // Serialize ManagementGraph to json - wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ","); - - wrt.write("\"verticetimes\": {"); - boolean first = true; - for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) { - - for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { - - Execution exec = vertex.getCurrentExecutionAttempt(); - - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write("\""+exec.getAttemptId() +"\": {"); - wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\","); - wrt.write("\"vertexname\": \"" + vertex + "\","); - wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ","); - wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ","); - wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ","); - wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ","); - wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ","); - wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ","); - wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ","); - wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + ""); - wrt.write("}"); - } - - } - wrt.write("}}"); - - } - catch (IOException ioe) { // Connection closed by client - String message = "Info server for jobmanager: Connection closed by client - " + ioe.getClass().getSimpleName(); - - if (LOG.isDebugEnabled()) { - LOG.debug(message, ioe); - } - else if (LOG.isInfoEnabled()) { - LOG.info(message); - } - } - } - - /** - * Writes the version and the revision of Flink. - * - * @param wrt - */ - private void writeJsonForVersion(PrintWriter wrt) { - wrt.write("{"); - wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\","); - wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\""); - wrt.write("}"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java index 1469a4c..0447b00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.channels.ChannelType; import org.apache.flink.util.StringUtils; @@ -39,7 +39,7 @@ public class JsonFactory { json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\","); json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\","); - AllocatedSlot slot = vertex.getCurrentAssignedResource(); + SimpleSlot slot = vertex.getCurrentAssignedResource(); String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname(); json.append("\"vertexinstancename\": \"" + instanceName + "\""); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 9766ab6..31800e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -124,7 +124,7 @@ public class WebInfoServer { // ----- the handlers for the servlets ----- ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); servletContext.setContextPath("/"); - servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo"); + servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager)), "/jobsInfo"); servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java index 241d6cd..f9126fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java @@ -26,13 +26,12 @@ import java.util.Set; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData; import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; - public class JobProfilingData { private final ExecutionGraph executionGraph; @@ -59,7 +58,7 @@ public class JobProfilingData { public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) { for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - AllocatedSlot slot = executionVertex.getCurrentAssignedResource(); + SimpleSlot slot = executionVertex.getCurrentAssignedResource(); if (slot != null && slot.getInstance().getInstanceConnectionInfo().equals( instanceProfilingData.getInstanceConnectionInfo())) { @@ -76,7 +75,7 @@ public class JobProfilingData { final Set<Instance> tempSet = new HashSet<Instance>(); for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - AllocatedSlot slot = executionVertex.getCurrentAssignedResource(); + SimpleSlot slot = executionVertex.getCurrentAssignedResource(); if (slot != null) { tempSet.add(slot.getInstance()); } http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java index 1c23293..e6e5287 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -423,7 +423,7 @@ public class TaskManager implements TaskOperationProtocol { LOG.info("Shutting down TaskManager"); cancelAndClearEverything(new Exception("Task Manager is shutting down")); - + // first, stop the heartbeat thread and wait for it to terminate this.heartbeatThread.interrupt(); try { http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 14ce15a..cf63e43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -38,8 +38,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobID; @@ -117,7 +117,7 @@ public class ExecutionGraphDeploymentTest { }); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(jobId); + final SimpleSlot slot = instance.allocateSimpleSlot(jobId); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 30b05ee..c685ec0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -32,11 +32,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -67,7 +67,7 @@ public class ExecutionGraphTestUtils { } } - public static void setVertexResource(ExecutionVertex vertex, AllocatedSlot slot) { + public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) { try { Execution exec = vertex.getCurrentExecutionAttempt(); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 2848466..f0c9342 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -56,7 +56,7 @@ public class ExecutionStateProgressTest { // mock resources and mock taskmanager TaskOperationProtocol taskManager = getSimpleAcknowledgingTaskmanager(); for (ExecutionVertex ee : ejv.getTaskVertices()) { - AllocatedSlot slot = getInstance(taskManager).allocateSlot(jid); + SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid); ee.deployToSlot(slot); } http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 9769529..ff1ef5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -29,7 +29,7 @@ import java.io.IOException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true), new TaskOperationResult(execId, false)); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -193,7 +193,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false), new TaskOperationResult(execId, true)); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -260,7 +260,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true)); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -301,7 +301,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true)); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -353,7 +353,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false)); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -389,7 +389,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed")); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -428,7 +428,7 @@ public class ExecutionVertexCancelTest { when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed")); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -484,7 +484,7 @@ public class ExecutionVertexCancelTest { try { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -526,7 +526,7 @@ public class ExecutionVertexCancelTest { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -540,7 +540,7 @@ public class ExecutionVertexCancelTest { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.CANCELING); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index f3081bc..5c7d58c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -30,8 +30,8 @@ import static org.mockito.Matchers.any; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.protocols.TaskOperationProtocol; @@ -52,7 +52,7 @@ public class ExecutionVertexDeploymentTest { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid); @@ -88,7 +88,7 @@ public class ExecutionVertexDeploymentTest { // mock taskmanager to simply accept the call final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid); @@ -133,7 +133,7 @@ public class ExecutionVertexDeploymentTest { final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid); @@ -189,7 +189,7 @@ public class ExecutionVertexDeploymentTest { // mock taskmanager to simply accept the call final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid); @@ -225,7 +225,7 @@ public class ExecutionVertexDeploymentTest { // mock taskmanager to simply accept the call final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]); @@ -269,7 +269,7 @@ public class ExecutionVertexDeploymentTest { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid); @@ -306,7 +306,7 @@ public class ExecutionVertexDeploymentTest { TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, queue); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index e2f0ee8..e1e4549 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -25,8 +25,8 @@ import static org.mockito.Mockito.*; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; @@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest { // a slot than cannot be deployed to final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); slot.cancel(); assertFalse(slot.isReleased()); @@ -80,7 +80,7 @@ public class ExecutionVertexSchedulingTest { // a slot than cannot be deployed to final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); slot.cancel(); assertFalse(slot.isReleased()); @@ -119,7 +119,7 @@ public class ExecutionVertexSchedulingTest { // a slot than cannot be deployed to final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class); final Instance instance = getInstance(taskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java index a9af2cf..a321ad8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java @@ -35,7 +35,7 @@ public class AllocatedSlotTest { try { // cancel, then release { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.isAlive()); slot.cancel(); @@ -51,7 +51,7 @@ public class AllocatedSlotTest { // release immediately { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.isAlive()); slot.releaseSlot(); @@ -74,32 +74,32 @@ public class AllocatedSlotTest { // assign to alive slot { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); // try to add another one assertFalse(slot.setExecutedVertex(ev_2)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); } // assign to canceled slot { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); slot.cancel(); assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecutedVertex()); + assertNull(slot.getExecution()); } // assign to released { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); slot.releaseSlot(); assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecutedVertex()); + assertNull(slot.getExecution()); } } catch (Exception e) { @@ -113,9 +113,9 @@ public class AllocatedSlotTest { try { Execution ev = mock(Execution.class); - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); slot.cancel(); slot.releaseSlot(); @@ -129,12 +129,12 @@ public class AllocatedSlotTest { } } - public static AllocatedSlot getSlot() throws Exception { + public static SimpleSlot getSlot() throws Exception { HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001); Instance instance = new Instance(connection, new InstanceID(), hardwareDescription, 1); - return instance.allocateSlot(new JobID()); + return instance.allocateSimpleSlot(new JobID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 3a80357..5e008b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -44,10 +44,10 @@ public class InstanceTest { assertEquals(4, instance.getNumberOfAvailableSlots()); assertEquals(0, instance.getNumberOfAllocatedSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); - AllocatedSlot slot4 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID()); assertNotNull(slot1); assertNotNull(slot2); @@ -60,7 +60,7 @@ public class InstanceTest { slot3.getSlotNumber() + slot4.getSlotNumber()); // no more slots - assertNull(instance.allocateSlot(new JobID())); + assertNull(instance.allocateSimpleSlot(new JobID())); try { instance.returnAllocatedSlot(slot2); fail("instance accepted a non-cancelled slot."); @@ -108,9 +108,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); instance.markDead(); @@ -138,9 +138,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); instance.cancelAndReleaseAllSlots(); http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java index cc767c2..5e93cbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java @@ -103,4 +103,20 @@ public class JobManagerTestUtils { } } } + + public static void killRandomHeartbeatThread() throws InterruptedException { + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + + for (Thread t : threads) { + if (t == null) { + continue; + } + if (t.getName().equals("Heartbeat Thread")) { + t.stop(); + t.join(); + return; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java index 6b8be15..888da9c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmanager; +import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread; import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager; import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated; import static org.junit.Assert.*; @@ -112,4 +113,65 @@ public class TaskManagerFailsITCase { fail(e.getMessage()); } } + + @Test + public void testExecutionWithFailingHeartbeat() { + final int NUM_TASKS = 31; + + try { + final AbstractJobVertex sender = new AbstractJobVertex("Sender"); + final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); + sender.setInvokableClass(Sender.class); + receiver.setInvokableClass(BlockingReceiver.class); + sender.setParallelism(NUM_TASKS); + receiver.setParallelism(NUM_TASKS); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + + final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); + + final JobManager jm = startJobManager(2, NUM_TASKS); + + try { + + JobSubmissionResult result = jm.submitJob(jobGraph); + + if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) { + System.out.println(result.getDescription()); + } + assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode()); + + ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); + + // wait until everyone has settled in + long deadline = System.currentTimeMillis() + 2000; + while (System.currentTimeMillis() < deadline) { + + boolean allrunning = true; + for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) { + if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) { + allrunning = false; + break; + } + } + + if (allrunning) { + break; + } + Thread.sleep(200); + } + + // kill one task manager's heartbeat + killRandomHeartbeatThread(); + + eg.waitForJobEnd(); + } + finally { + jm.shutdown(); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java index 239ab9d..5680f47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager; import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager; import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated; +import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread; import static org.junit.Assert.*; import org.apache.flink.runtime.client.AbstractJobResult; @@ -115,4 +116,67 @@ public class TaskManagerFailsWithSlotSharingITCase { fail(e.getMessage()); } } + + @Test + public void testExecutionWithFailingHeartbeat() { + final int NUM_TASKS = 20; + + try { + final AbstractJobVertex sender = new AbstractJobVertex("Sender"); + final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); + sender.setInvokableClass(Sender.class); + receiver.setInvokableClass(BlockingReceiver.class); + sender.setParallelism(NUM_TASKS); + receiver.setParallelism(NUM_TASKS); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + sender.setSlotSharingGroup(sharingGroup); + receiver.setSlotSharingGroup(sharingGroup); + + final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); + + final JobManager jm = startJobManager(2, NUM_TASKS / 2); + + try { + JobSubmissionResult result = jm.submitJob(jobGraph); + + if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) { + System.out.println(result.getDescription()); + } + assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode()); + + ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); + + // wait until everyone has settled in + long deadline = System.currentTimeMillis() + 2000; + while (System.currentTimeMillis() < deadline) { + + boolean allrunning = true; + for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) { + if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) { + allrunning = false; + break; + } + } + + if (allrunning) { + break; + } + Thread.sleep(200); + } + + killRandomHeartbeatThread(); + + eg.waitForJobEnd(); + } + finally { + jm.shutdown(); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } }
