[
https://issues.apache.org/jira/browse/FLINK-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15618451#comment-15618451
]
ASF GitHub Bot commented on FLINK-4888:
---------------------------------------
Github user philippbussche commented on a diff in the pull request:
https://github.com/apache/flink/pull/2683#discussion_r85644418
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -1828,6 +1828,33 @@ class JobManager(
jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new
Gauge[Long] {
override def getValue: Long = JobManager.this.currentJobs.size
})
+ jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numFailedJobs", new
Gauge[Long] {
+ override def getValue: Long = {
+ var failedJobs = 0
+ val ourJobs = createJobStatusOverview()
+ val future = (archive ?
RequestJobsOverview.getInstance())(timeout)
+ val archivedJobs : JobsOverview = Await.result(future,
timeout).asInstanceOf[JobsOverview]
+ failedJobs += ourJobs.getNumJobsFailed() +
archivedJobs.getNumJobsFailed()
+ failedJobs
+ }})
+ jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numCancelledJobs", new
Gauge[Long] {
+ override def getValue: Long = {
+ var cancelledJobs = 0
+ val ourJobs = createJobStatusOverview()
+ val future = (archive ?
RequestJobsOverview.getInstance())(timeout)
+ val archivedJobs : JobsOverview = Await.result(future,
timeout).asInstanceOf[JobsOverview]
+ cancelledJobs += ourJobs.getNumJobsCancelled() +
archivedJobs.getNumJobsCancelled()
+ cancelledJobs
+ }})
+ jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numFinishedJobs", new
Gauge[Long] {
+ override def getValue: Long = {
+ var finishedJobs = 0
+ val ourJobs = createJobStatusOverview()
+ val future = (archive ?
RequestJobsOverview.getInstance())(timeout)
+ val archivedJobs : JobsOverview = Await.result(future,
timeout).asInstanceOf[JobsOverview]
+ finishedJobs += ourJobs.getNumJobsFinished() +
archivedJobs.getNumJobsFinished()
+ finishedJobs
+ }})
--- End diff --
@zentol , @rmetzger not sure if you meant something like to following
perhaps to work around the RPC calls (I guess this would eliminate them
completely however are the changes a bit more "invasive" I would think):
So what we could do is where we receive the Job status change
```
case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
```
Send a RemoveJob message with the newJobStatus (could be "cancelled" as far
as I can see)
```
self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true,
newJobStatus))
```
Then for the RemoveJob message propagate this to the method that actually
removes the job:
```
removeJob(graph.getJobID, clearPersistedJob, newJobStatus)
```
And finally besides taking the job out of the map representing the
currentJobs maybe increment a counter for canceled archived jobs or have a
different map that stores archived canceled jobs (and others for failed and
finished) so that when it is time for sending out the monitoring metrics we
would use this map/counter rather than doing any RPC calls at all.
What do you guys think ?
> instantiated job manager metrics missing important job statistics
> ------------------------------------------------------------------
>
> Key: FLINK-4888
> URL: https://issues.apache.org/jira/browse/FLINK-4888
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.1.2
> Reporter: Philipp von dem Bussche
> Assignee: Philipp von dem Bussche
> Priority: Minor
>
> A jobmanager is currently (only) instantiated with the following metrics:
> taskSlotsAvailable, taskSlotsTotal, numRegisteredTaskManagers and
> numRunningJobs. Important other metrics would be numFailedJobs,
> numCancelledJobs and numFinishedJobs. Also to get parity between JobManager
> metrics and whats available via the REST API it would be good to have these.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)