Add Workflow and Job latency metrics

To understand the workflow and job execution pattern, adding these metrics to 
monitor the process.

Only succeeded workflows and jobs will be recorded. Otherwise the data is not 
clean for workflow or job latency.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/52d3bb83
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/52d3bb83
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/52d3bb83

Branch: refs/heads/master
Commit: 52d3bb83c6c73316099f94de4d732e8c36c7171d
Parents: 4d2734e
Author: Junkai Xue <[email protected]>
Authored: Tue Jan 2 15:52:30 2018 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:33:07 2018 -0800

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java | 13 ++++++--
 .../helix/monitoring/mbeans/JobMonitor.java     | 32 ++++++++++++++++++++
 .../monitoring/mbeans/JobMonitorMBean.java      | 12 ++++++++
 .../monitoring/mbeans/WorkflowMonitor.java      | 32 +++++++++++++++++++-
 .../monitoring/mbeans/WorkflowMonitorMBean.java | 12 ++++++++
 .../org/apache/helix/task/JobRebalancer.java    |  6 ++--
 .../org/apache/helix/task/TaskRebalancer.java   |  7 +++--
 7 files changed, 105 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 61f4ce1..2a99341 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -515,11 +515,14 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
-
   public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState 
to) {
+    updateWorkflowCounters(workflowConfig, to, -1L);
+  }
+
+  public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState 
to, long latency) {
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to);
+    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to, 
latency);
   }
 
   private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState 
current) {
@@ -568,9 +571,13 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   public void updateJobCounters(JobConfig jobConfig, TaskState to) {
+    updateJobCounters(jobConfig, to, -1L);
+  }
+
+  public void updateJobCounters(JobConfig jobConfig, TaskState to, long 
latency) {
     String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
+    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to, latency);
   }
 
   private void updateJobGauges(String jobType, TaskState current) {

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 91f0b73..39108cf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -28,6 +28,7 @@ public class JobMonitor implements JobMonitorMBean {
 
   private static final String JOB_KEY = "Job";
   private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class);
+  private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 
hour
 
   private String _clusterName;
   private String _jobType;
@@ -38,6 +39,9 @@ public class JobMonitor implements JobMonitorMBean {
   private long _existingJobGauge;
   private long _queuedJobGauge;
   private long _runningJobGauge;
+  private long _maximumJobLatencyGauge;
+  private long _jobLatencyCount;
+  private long _lastResetTime;
 
   public JobMonitor(String clusterName, String jobType) {
     _clusterName = clusterName;
@@ -48,6 +52,9 @@ public class JobMonitor implements JobMonitorMBean {
     _existingJobGauge = 0L;
     _queuedJobGauge = 0L;
     _runningJobGauge = 0L;
+    _lastResetTime = System.currentTimeMillis();
+    _jobLatencyCount = 0L;
+    _maximumJobLatencyGauge = 0L;
   }
 
   @Override
@@ -81,6 +88,16 @@ public class JobMonitor implements JobMonitorMBean {
   }
 
   @Override
+  public long getMaximumJobLatencyGauge() {
+    return _maximumJobLatencyGauge;
+  }
+
+  @Override
+  public long getJobLatencyCount() {
+    return _jobLatencyCount;
+  }
+
+  @Override
   public String getSensorName() {
     return String.format("%s.%s.%s", _clusterName, JOB_KEY, _jobType);
   }
@@ -93,15 +110,26 @@ public class JobMonitor implements JobMonitorMBean {
    * Update job counters with transition state
    * @param to The to state of job, cleaned by ZK when it is null
    */
+
   public void updateJobCounters(TaskState to) {
+    updateJobCounters(to, 0);
+  }
+
+  public void updateJobCounters(TaskState to, long latency) {
     // TODO maybe use separate TIMED_OUT counter later
     if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) {
       _failedJobCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfullJobCount++;
+
+      // Only count succeeded jobs
+      _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency);
+      _jobLatencyCount += latency > 0 ? latency : 0;
     } else if (to.equals(TaskState.ABORTED)) {
       _abortedJobCount++;
     }
+
+
   }
 
   /**
@@ -111,6 +139,10 @@ public class JobMonitor implements JobMonitorMBean {
     _queuedJobGauge = 0L;
     _existingJobGauge = 0L;
     _runningJobGauge = 0L;
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < 
System.currentTimeMillis()) {
+      _lastResetTime = System.currentTimeMillis();
+      _maximumJobLatencyGauge = 0L;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 5d30ec9..23e4a93 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -61,4 +61,16 @@ public interface JobMonitorMBean extends SensorNameProvider {
    * @return
    */
   public long getRunningJobGauge();
+
+  /**
+   * Get maximum latency of jobs running time. It will be cleared every hour
+   * @return
+   */
+  public long getMaximumJobLatencyGauge();
+
+  /**
+   * Get job latency counter.
+   * @return
+   */
+  public long getJobLatencyCount();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index 00f75d4..dc3bc5a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -23,6 +23,7 @@ import org.apache.helix.task.TaskState;
 
 public class WorkflowMonitor implements WorkflowMonitorMBean {
   private static final String WORKFLOW_KEY = "Workflow";
+  private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 
hour
 
   private String _clusterName;
   private String _workflowType;
@@ -33,6 +34,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   private long _existingWorkflowGauge;
   private long _queuedWorkflowGauge;
   private long _runningWorkflowGauge;
+  private long _totalWorkflowLatencyCount;
+  private long _maximumWorkflowLatencyGauge;
+  private long _lastResetTime;
 
 
   public WorkflowMonitor(String clusterName, String workflowType) {
@@ -44,6 +48,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
     _existingWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
+    _totalWorkflowLatencyCount = 0L;
+    _maximumWorkflowLatencyGauge = 0L;
+    _lastResetTime = System.currentTimeMillis();
   }
 
   @Override
@@ -76,6 +83,16 @@ public class WorkflowMonitor implements WorkflowMonitorMBean 
{
     return _runningWorkflowGauge;
   }
 
+  @Override
+  public long getWorkflowLatencyCount() {
+    return _totalWorkflowLatencyCount;
+  }
+
+  @Override
+  public long getMaximumWorkflowLatencyGauge() {
+    return _maximumWorkflowLatencyGauge;
+  }
+
   @Override public String getSensorName() {
     return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, 
_workflowType);
   }
@@ -88,11 +105,20 @@ public class WorkflowMonitor implements 
WorkflowMonitorMBean {
    * Update workflow with transition state
    * @param to The to state of a workflow
    */
+
   public void updateWorkflowCounters(TaskState to) {
-   if (to.equals(TaskState.FAILED)) {
+    updateWorkflowCounters(to, 0);
+  }
+
+  public void updateWorkflowCounters(TaskState to, long latency) {
+    if (to.equals(TaskState.FAILED)) {
       _failedWorkflowCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfulWorkflowCount++;
+
+      // Only record latency larger than 0 and succeeded workflows
+      _maximumWorkflowLatencyGauge = Math.max(_maximumWorkflowLatencyGauge, 
latency);
+      _totalWorkflowLatencyCount += latency > 0 ? latency : 0;
     }
   }
 
@@ -104,6 +130,10 @@ public class WorkflowMonitor implements 
WorkflowMonitorMBean {
     _existingWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < 
System.currentTimeMillis()) {
+      _lastResetTime = System.currentTimeMillis();
+      _maximumWorkflowLatencyGauge = 0;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
index dcd633d..2558e5b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -60,4 +60,16 @@ public interface WorkflowMonitorMBean extends 
SensorNameProvider {
    * @return
    */
   public long getRunningWorkflowGauge();
+
+  /**
+   * Get workflow latency count
+   * @return
+   */
+  public long getWorkflowLatencyCount();
+
+  /**
+   * Get maximum workflow latency gauge. It will be reset in 1 hour.
+   * @return
+   */
+  public long getMaximumWorkflowLatencyGauge();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5a17c6b..51da264 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -432,7 +432,8 @@ public class JobRebalancer extends TaskRebalancer {
     if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx,
           cache.getJobConfigMap());
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
+      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
+          jobCtx.getFinishTime() - jobCtx.getStartTime());
       _rebalanceScheduler.removeScheduledRebalance(jobResource);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobResource);
       return buildEmptyAssignment(jobResource, currStateOutput);
@@ -620,8 +621,7 @@ public class JobRebalancer extends TaskRebalancer {
         jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
       }
     }
-    _clusterStatusMonitor
-        .updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED);
+    _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), 
TaskState.FAILED);
     _rebalanceScheduler.removeScheduledRebalance(jobName);
     TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 9890a0a..3d3f86e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -82,6 +82,7 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
+              // Skip aborted jobs latency since they are not accurate latency 
for job running time
               _clusterStatusMonitor
                   .updateJobCounters(jobConfigMap.get(jobToFail), 
TaskState.ABORTED);
             }
@@ -89,14 +90,16 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
           return true;
         }
       }
-      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED && 
jobState != TaskState.TIMED_OUT) {
+      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
+          && jobState != TaskState.TIMED_OUT) {
         incomplete = true;
       }
     }
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
-      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED);
+      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED,
+          ctx.getFinishTime() - ctx.getStartTime());
       return true;
     }
 

Reply via email to