[ 
https://issues.apache.org/jira/browse/FLINK-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15618451#comment-15618451
 ] 

ASF GitHub Bot commented on FLINK-4888:
---------------------------------------

Github user philippbussche commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2683#discussion_r85644418
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -1828,6 +1828,33 @@ class JobManager(
         jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new 
Gauge[Long] {
           override def getValue: Long = JobManager.this.currentJobs.size
         })
    +    jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numFailedJobs", new 
Gauge[Long] {
    +      override def getValue: Long = {
    +         var failedJobs = 0
    +         val ourJobs = createJobStatusOverview()
    +         val future = (archive ? 
RequestJobsOverview.getInstance())(timeout)
    +         val archivedJobs : JobsOverview = Await.result(future, 
timeout).asInstanceOf[JobsOverview]
    +         failedJobs += ourJobs.getNumJobsFailed() + 
archivedJobs.getNumJobsFailed()
    +         failedJobs
    +    }})
    +    jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numCancelledJobs", new 
Gauge[Long] {
    +      override def getValue: Long = {
    +         var cancelledJobs = 0
    +         val ourJobs = createJobStatusOverview()
    +         val future = (archive ? 
RequestJobsOverview.getInstance())(timeout)
    +         val archivedJobs : JobsOverview = Await.result(future, 
timeout).asInstanceOf[JobsOverview]
    +         cancelledJobs += ourJobs.getNumJobsCancelled() + 
archivedJobs.getNumJobsCancelled()
    +         cancelledJobs
    +    }})
    +    jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numFinishedJobs", new 
Gauge[Long] {
    +      override def getValue: Long = {
    +         var finishedJobs = 0
    +         val ourJobs = createJobStatusOverview()
    +         val future = (archive ? 
RequestJobsOverview.getInstance())(timeout)
    +         val archivedJobs : JobsOverview = Await.result(future, 
timeout).asInstanceOf[JobsOverview]
    +         finishedJobs += ourJobs.getNumJobsFinished() + 
archivedJobs.getNumJobsFinished()
    +         finishedJobs
    +    }})
    --- End diff --
    
    @zentol , @rmetzger not sure if you meant something like to following 
perhaps to work around the RPC calls (I guess this would eliminate them 
completely however are the changes a bit more "invasive" I would think):
    So what we could do is where we receive the Job status change
    ```
    case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
    ```
    Send a RemoveJob message with the newJobStatus (could be "cancelled" as far 
as I can see)
    ```
    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true, 
newJobStatus))
    ```
    Then for the RemoveJob message propagate this to the method that actually 
removes the job:
    ```
    removeJob(graph.getJobID, clearPersistedJob, newJobStatus)
    ```
    And finally besides taking the job out of the map representing the 
currentJobs maybe increment a counter for canceled archived jobs or have a 
different map that stores archived canceled jobs (and others for failed and 
finished) so that when it is time for sending out the monitoring metrics we 
would use this map/counter rather than doing any RPC calls at all.
    What do you guys think ?


> instantiated job manager metrics missing important job statistics 
> ------------------------------------------------------------------
>
>                 Key: FLINK-4888
>                 URL: https://issues.apache.org/jira/browse/FLINK-4888
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>    Affects Versions: 1.1.2
>            Reporter: Philipp von dem Bussche
>            Assignee: Philipp von dem Bussche
>            Priority: Minor
>
> A jobmanager is currently (only) instantiated with the following metrics: 
> taskSlotsAvailable, taskSlotsTotal, numRegisteredTaskManagers and 
> numRunningJobs. Important other metrics would be numFailedJobs, 
> numCancelledJobs and numFinishedJobs. Also to get parity between JobManager 
> metrics and whats available via the REST API it would be good to have these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to