Repository: flink Updated Branches: refs/heads/release-0.9 ecfde6dd9 -> f5f0709c9
[FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in webinterface This closes #826 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e513be72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e513be72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e513be72 Branch: refs/heads/release-0.9 Commit: e513be72a486b4f2e13c617eb6d4d08c03503ae7 Parents: ecfde6d Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Jun 12 01:45:03 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jun 12 14:26:47 2015 +0200 ---------------------------------------------------------------------- .../jobmanager/web/JobManagerInfoServlet.java | 31 +++++++++++++++++ .../js/jobmanagerFrontend.js | 36 +++++++++----------- .../runtime/jobmanager/MemoryArchivist.scala | 17 +++++++++ .../runtime/messages/ArchiveMessages.scala | 11 +++++- 4 files changed, 75 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java index 6d58306..3fc3c82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -66,6 +66,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.StringUtils; import org.eclipse.jetty.io.EofException; +import scala.Tuple3; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -117,6 +118,20 @@ public class JobManagerInfoServlet extends HttpServlet { writeJsonForArchive(resp.getWriter(), archivedJobs); } } + else if("jobcounts".equals(req.getParameter("get"))) { + response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(), + new Timeout(timeout)); + + result = Await.result(response, timeout); + + if(!(result instanceof Tuple3)) { + throw new RuntimeException("RequestJobCounts requires a response of type " + + "Tuple3. Instead the response is of type " + result.getClass() + + "."); + } else { + writeJsonForJobCounts(resp.getWriter(), (Tuple3)result); + } + } else if("job".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); @@ -341,6 +356,22 @@ public class JobManagerInfoServlet extends HttpServlet { } /** + * Writes Json with the job counts + * + * @param wrt + * @param counts + */ + private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer> jobCounts) { + + wrt.write("{"); + wrt.write("\"finished\": " + jobCounts._1() + ","); + wrt.write("\"canceled\": " + jobCounts._2() + ","); + wrt.write("\"failed\": " + jobCounts._3()); + wrt.write("}"); + + } + + /** * Writes infos about archived job in Json format, including groupvertices and groupverticetimes * * @param wrt http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js index 92f6979..63d287c 100644 --- a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js +++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js @@ -81,6 +81,22 @@ function poll(jobId) { })(); /* + * Polls the job execution counts on page load and every 2 seconds + */ +(function pollJobCounts() { + $.ajax({ url : "jobsInfo?get=jobcounts", cache: false, type : "GET", + success : function(json) { + + $("#jobs-finished").html(json.finished); + $("#jobs-canceled").html(json.canceled); + $("#jobs-failed").html(json.failed); + + }, dataType : "json", + }); + setTimeout(pollJobCounts, 2000); +})(); + +/* * Polls the number of taskmanagers on page load */ (function pollTaskmanagers() { @@ -418,20 +434,12 @@ function updateTable(json) { } } -var archive_finished = 0; -var archive_failed = 0; -var archive_canceled = 0; - /* * Creates job history table */ function fillTableArchive(table, json) { $(table).html(""); - - $("#jobs-finished").html(archive_finished); - $("#jobs-failed").html(archive_failed); - $("#jobs-canceled").html(archive_canceled); - + $.each(json, function(i, job) { _fillTableArchive(table, job, false) }); @@ -459,14 +467,4 @@ function _fillTableArchive(table, job, prepend) { + job.jobname + " (" + formattedTimeFromTimestamp(parseInt(job.time)) + ")</a></li>"); - if (job.status == "FINISHED") - archive_finished++; - if (job.status == "FAILED") - archive_failed++; - if (job.status == "CANCELED") - archive_canceled++; - - $("#jobs-finished").html(archive_finished); - $("#jobs-failed").html(archive_failed); - $("#jobs-canceled").html(archive_canceled); } http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 62ea435..54d2f2f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.Actor import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.executiongraph.ExecutionGraph import org.apache.flink.runtime.messages.ArchiveMessages._ @@ -45,6 +46,8 @@ import scala.collection.mutable * then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise * a [[JobNotFound]] message is returned * + * - [[RequestJobCounts]] returns the number of finished, canceled, and failed jobs as a Tuple3 + * * @param max_entries Maximum number of stored Flink jobs */ class MemoryArchivist(private val max_entries: Int) @@ -57,12 +60,23 @@ class MemoryArchivist(private val max_entries: Int) */ val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]() + /* Counters for finished, canceled, and failed jobs */ + var finishedCnt: Int = 0 + var canceledCnt: Int = 0 + var failedCnt: Int = 0 + override def receiveWithLogMessages: Receive = { /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => // wrap graph inside a soft reference graphs.update(jobID, graph) + // update job counters + graph.getState match { + case JobStatus.FINISHED => finishedCnt += 1 + case JobStatus.CANCELED => canceledCnt += 1 + case JobStatus.FAILED => failedCnt += 1 + } trimHistory() case RequestArchivedJob(jobID: JobID) => @@ -83,6 +97,9 @@ class MemoryArchivist(private val max_entries: Int) case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState) case None => sender ! JobNotFound(jobID) } + + case RequestJobCounts => + sender ! (finishedCnt, canceledCnt, failedCnt) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index e9e7dec..c4e3f3e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -34,6 +34,11 @@ object ArchiveMessages { case object RequestArchivedJobs /** + * Requests the number of finished, canceled, and failed jobs + */ + case object RequestJobCounts + + /** * Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]] * @param jobID */ @@ -56,7 +61,7 @@ object ArchiveMessages { jobs.asJavaCollection } } - + // -------------------------------------------------------------------------- // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- @@ -64,4 +69,8 @@ object ArchiveMessages { def getRequestArchivedJobs : AnyRef = { RequestArchivedJobs } + + def getRequestJobCounts : AnyRef = { + RequestJobCounts + } }