Add Workflow and Job latency metrics To understand the workflow and job execution pattern, adding these metrics to monitor the process.
Only succeeded workflows and jobs will be recorded. Otherwise the data is not clean for workflow or job latency. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/52d3bb83 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/52d3bb83 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/52d3bb83 Branch: refs/heads/master Commit: 52d3bb83c6c73316099f94de4d732e8c36c7171d Parents: 4d2734e Author: Junkai Xue <[email protected]> Authored: Tue Jan 2 15:52:30 2018 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:33:07 2018 -0800 ---------------------------------------------------------------------- .../monitoring/mbeans/ClusterStatusMonitor.java | 13 ++++++-- .../helix/monitoring/mbeans/JobMonitor.java | 32 ++++++++++++++++++++ .../monitoring/mbeans/JobMonitorMBean.java | 12 ++++++++ .../monitoring/mbeans/WorkflowMonitor.java | 32 +++++++++++++++++++- .../monitoring/mbeans/WorkflowMonitorMBean.java | 12 ++++++++ .../org/apache/helix/task/JobRebalancer.java | 6 ++-- .../org/apache/helix/task/TaskRebalancer.java | 7 +++-- 7 files changed, 105 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 61f4ce1..2a99341 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -515,11 +515,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { updateWorkflowGauges(workflowConfigMap.get(workflow), currentState); } } - public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to) { + updateWorkflowCounters(workflowConfig, to, -1L); + } + + public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to, long latency) { String workflowType = workflowConfig.getWorkflowType(); workflowType = preProcessWorkflow(workflowType); - _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to); + _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to, latency); } private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) { @@ -568,9 +571,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } public void updateJobCounters(JobConfig jobConfig, TaskState to) { + updateJobCounters(jobConfig, to, -1L); + } + + public void updateJobCounters(JobConfig jobConfig, TaskState to, long latency) { String jobType = jobConfig.getJobType(); jobType = preProcessJobMonitor(jobType); - _perTypeJobMonitorMap.get(jobType).updateJobCounters(to); + _perTypeJobMonitorMap.get(jobType).updateJobCounters(to, latency); } private void updateJobGauges(String jobType, TaskState current) { http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java index 91f0b73..39108cf 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java @@ -28,6 +28,7 @@ public class JobMonitor implements JobMonitorMBean { private static final String JOB_KEY = "Job"; private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class); + private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour private String _clusterName; private String _jobType; @@ -38,6 +39,9 @@ public class JobMonitor implements JobMonitorMBean { private long _existingJobGauge; private long _queuedJobGauge; private long _runningJobGauge; + private long _maximumJobLatencyGauge; + private long _jobLatencyCount; + private long _lastResetTime; public JobMonitor(String clusterName, String jobType) { _clusterName = clusterName; @@ -48,6 +52,9 @@ public class JobMonitor implements JobMonitorMBean { _existingJobGauge = 0L; _queuedJobGauge = 0L; _runningJobGauge = 0L; + _lastResetTime = System.currentTimeMillis(); + _jobLatencyCount = 0L; + _maximumJobLatencyGauge = 0L; } @Override @@ -81,6 +88,16 @@ public class JobMonitor implements JobMonitorMBean { } @Override + public long getMaximumJobLatencyGauge() { + return _maximumJobLatencyGauge; + } + + @Override + public long getJobLatencyCount() { + return _jobLatencyCount; + } + + @Override public String getSensorName() { return String.format("%s.%s.%s", _clusterName, JOB_KEY, _jobType); } @@ -93,15 +110,26 @@ public class JobMonitor implements JobMonitorMBean { * Update job counters with transition state * @param to The to state of job, cleaned by ZK when it is null */ + public void updateJobCounters(TaskState to) { + updateJobCounters(to, 0); + } + + public void updateJobCounters(TaskState to, long latency) { // TODO maybe use separate TIMED_OUT counter later if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) { _failedJobCount++; } else if (to.equals(TaskState.COMPLETED)) { _successfullJobCount++; + + // Only count succeeded jobs + _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency); + _jobLatencyCount += latency > 0 ? latency : 0; } else if (to.equals(TaskState.ABORTED)) { _abortedJobCount++; } + + } /** @@ -111,6 +139,10 @@ public class JobMonitor implements JobMonitorMBean { _queuedJobGauge = 0L; _existingJobGauge = 0L; _runningJobGauge = 0L; + if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) { + _lastResetTime = System.currentTimeMillis(); + _maximumJobLatencyGauge = 0L; + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java index 5d30ec9..23e4a93 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java @@ -61,4 +61,16 @@ public interface JobMonitorMBean extends SensorNameProvider { * @return */ public long getRunningJobGauge(); + + /** + * Get maximum latency of jobs running time. It will be cleared every hour + * @return + */ + public long getMaximumJobLatencyGauge(); + + /** + * Get job latency counter. + * @return + */ + public long getJobLatencyCount(); } http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java index 00f75d4..dc3bc5a 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java @@ -23,6 +23,7 @@ import org.apache.helix.task.TaskState; public class WorkflowMonitor implements WorkflowMonitorMBean { private static final String WORKFLOW_KEY = "Workflow"; + private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour private String _clusterName; private String _workflowType; @@ -33,6 +34,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean { private long _existingWorkflowGauge; private long _queuedWorkflowGauge; private long _runningWorkflowGauge; + private long _totalWorkflowLatencyCount; + private long _maximumWorkflowLatencyGauge; + private long _lastResetTime; public WorkflowMonitor(String clusterName, String workflowType) { @@ -44,6 +48,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean { _existingWorkflowGauge = 0L; _queuedWorkflowGauge = 0L; _runningWorkflowGauge = 0L; + _totalWorkflowLatencyCount = 0L; + _maximumWorkflowLatencyGauge = 0L; + _lastResetTime = System.currentTimeMillis(); } @Override @@ -76,6 +83,16 @@ public class WorkflowMonitor implements WorkflowMonitorMBean { return _runningWorkflowGauge; } + @Override + public long getWorkflowLatencyCount() { + return _totalWorkflowLatencyCount; + } + + @Override + public long getMaximumWorkflowLatencyGauge() { + return _maximumWorkflowLatencyGauge; + } + @Override public String getSensorName() { return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType); } @@ -88,11 +105,20 @@ public class WorkflowMonitor implements WorkflowMonitorMBean { * Update workflow with transition state * @param to The to state of a workflow */ + public void updateWorkflowCounters(TaskState to) { - if (to.equals(TaskState.FAILED)) { + updateWorkflowCounters(to, 0); + } + + public void updateWorkflowCounters(TaskState to, long latency) { + if (to.equals(TaskState.FAILED)) { _failedWorkflowCount++; } else if (to.equals(TaskState.COMPLETED)) { _successfulWorkflowCount++; + + // Only record latency larger than 0 and succeeded workflows + _maximumWorkflowLatencyGauge = Math.max(_maximumWorkflowLatencyGauge, latency); + _totalWorkflowLatencyCount += latency > 0 ? latency : 0; } } @@ -104,6 +130,10 @@ public class WorkflowMonitor implements WorkflowMonitorMBean { _existingWorkflowGauge = 0L; _runningWorkflowGauge = 0L; _queuedWorkflowGauge = 0L; + if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) { + _lastResetTime = System.currentTimeMillis(); + _maximumWorkflowLatencyGauge = 0; + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java index dcd633d..2558e5b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java @@ -60,4 +60,16 @@ public interface WorkflowMonitorMBean extends SensorNameProvider { * @return */ public long getRunningWorkflowGauge(); + + /** + * Get workflow latency count + * @return + */ + public long getWorkflowLatencyCount(); + + /** + * Get maximum workflow latency gauge. It will be reset in 1 hour. + * @return + */ + public long getMaximumWorkflowLatencyGauge(); } http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 5a17c6b..51da264 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -432,7 +432,8 @@ public class JobRebalancer extends TaskRebalancer { if (isJobComplete(jobCtx, allPartitions, jobCfg)) { markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap()); - _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED); + _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED, + jobCtx.getFinishTime() - jobCtx.getStartTime()); _rebalanceScheduler.removeScheduledRebalance(jobResource); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); return buildEmptyAssignment(jobResource, currStateOutput); @@ -620,8 +621,7 @@ public class JobRebalancer extends TaskRebalancer { jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED); } } - _clusterStatusMonitor - .updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED); + _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED); _rebalanceScheduler.removeScheduledRebalance(jobName); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); } http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 9890a0a..3d3f86e 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -82,6 +82,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { for (String jobToFail : cfg.getJobDag().getAllNodes()) { if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) { ctx.setJobState(jobToFail, TaskState.ABORTED); + // Skip aborted jobs latency since they are not accurate latency for job running time _clusterStatusMonitor .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED); } @@ -89,14 +90,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return true; } } - if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED && jobState != TaskState.TIMED_OUT) { + if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED + && jobState != TaskState.TIMED_OUT) { incomplete = true; } } if (!incomplete && cfg.isTerminable()) { ctx.setWorkflowState(TaskState.COMPLETED); - _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED); + _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, + ctx.getFinishTime() - ctx.getStartTime()); return true; }
