Repository: helix Updated Branches: refs/heads/helix-0.6.x adfe4dda8 -> f5ac8f8b9
Monitors for Task framework 1. Add workflow and job monitor MBeans and implementations. 2. Add tests for MBean existing checking. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f5ac8f8b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f5ac8f8b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f5ac8f8b Branch: refs/heads/helix-0.6.x Commit: f5ac8f8b9e54b2457f39197b6baecfbb26e208a2 Parents: adfe4dd Author: Junkai Xue <[email protected]> Authored: Thu Sep 1 17:11:22 2016 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed Sep 7 15:30:16 2016 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 6 + .../stages/BestPossibleStateCalcStage.java | 10 ++ .../monitoring/mbeans/ClusterStatusMonitor.java | 134 ++++++++++++++++++- .../helix/monitoring/mbeans/JobMonitor.java | 118 ++++++++++++++++ .../monitoring/mbeans/JobMonitorMBean.java | 64 +++++++++ .../monitoring/mbeans/WorkflowMonitor.java | 116 ++++++++++++++++ .../monitoring/mbeans/WorkflowMonitorMBean.java | 64 +++++++++ .../org/apache/helix/task/JobRebalancer.java | 9 +- .../org/apache/helix/task/TaskRebalancer.java | 8 ++ .../apache/helix/task/WorkflowRebalancer.java | 12 ++ 10 files changed, 539 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 4830e7a..fb30f0d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -28,6 +28,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicReference; +import javax.management.MalformedObjectNameException; + import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.ConfigChangeListener; import org.apache.helix.ControllerChangeListener; @@ -65,6 +67,7 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.PauseSignal; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.task.TaskDriver; import org.apache.log4j.Logger; /** @@ -270,6 +273,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC } else { if (_clusterStatusMonitor == null) { _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName()); + TaskDriver driver = new TaskDriver(manager); + _clusterStatusMonitor.setWorkflowsStatus(driver); + _clusterStatusMonitor.setJobsStatus(driver); } event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor); http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index b24507c..2721f7d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -35,6 +35,9 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.task.JobRebalancer; +import org.apache.helix.task.TaskRebalancer; +import org.apache.helix.task.WorkflowRebalancer; import org.apache.helix.util.HelixUtil; import org.apache.log4j.Logger; @@ -149,6 +152,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { default: break; } + + if (rebalancer instanceof TaskRebalancer) { + TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer); + taskRebalancer.setClusterStatusMonitor( + (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor")); + } + if (rebalancer != null && mappingCalculator != null) { try { HelixManager manager = event.getAttribute("helixmanager"); http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index b4b5c01..55b774d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -40,6 +39,11 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.apache.log4j.Logger; import com.google.common.collect.Maps; @@ -56,6 +60,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { static final String RESOURCE_DN_KEY = "resourceName"; static final String INSTANCE_DN_KEY = "instanceName"; static final String MESSAGE_QUEUE_DN_KEY = "messageQueue"; + static final String WORKFLOW_TYPE_DN_KEY = "workflowType"; + static final String JOB_TYPE_DN_KEY = "jobType"; + static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT"; public static final String DEFAULT_TAG = "DEFAULT"; @@ -80,6 +87,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap = new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>(); + private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap = + new ConcurrentHashMap<String, WorkflowMonitor>(); + + private final Map<String, JobMonitor> _perTypeJobMonitorMap = + new ConcurrentHashMap<String, JobMonitor>(); + public ClusterStatusMonitor(String clusterName) { _clusterName = clusterName; _beanServer = ManagementFactory.getPlatformMBeanServer(); @@ -373,11 +386,81 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { unregisterPerInstanceResources(_perInstanceResourceMap.keySet()); unregister(getObjectName(clusterBeanName())); + + unregisterWorkflows(_perTypeWorkflowMonitorMap.keySet()); + unregisterJobs(_perTypeJobMonitorMap.keySet()); } catch (Exception e) { LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e); } } + public void setWorkflowsStatus(TaskDriver driver) { + Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows(); + for (String workflow : workflowConfigMap.keySet()) { + if (workflowConfigMap.get(workflow).isRecurring()) { + continue; + } + WorkflowContext workflowContext = driver.getWorkflowContext(workflow); + TaskState toState = + workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getWorkflowState(); + updateWorkflowStatus(workflowConfigMap.get(workflow), null, toState); + } + } + + public void updateWorkflowStatus(WorkflowConfig workflowConfig, TaskState from, TaskState to) { + String workflowType = workflowConfig.getWorkflowType(); + if (workflowType == null || workflowType.length() == 0) { + workflowType = DEFAULT_WORKFLOW_JOB_TYPE; + } + + if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) { + WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType); + try { + registerWorkflow(monitor); + } catch (MalformedObjectNameException e) { + LOG.error("Failed to register object for workflow type : " + workflowType, e); + } + _perTypeWorkflowMonitorMap.put(workflowType, monitor); + } + + _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowStats(from, to); + } + + public void setJobsStatus(TaskDriver driver) { + for (String workflow : driver.getWorkflows().keySet()) { + Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes(); + WorkflowContext workflowContext = driver.getWorkflowContext(workflow); + + for (String job : allJobs) { + TaskState toState = null; + if (workflowContext != null) { + toState = workflowContext.getJobState(job); + } + toState = toState == null ? TaskState.NOT_STARTED : toState; + updateJobStatus(driver.getJobConfig(job), null, toState); + } + } + } + + public void updateJobStatus(JobConfig jobConfig, TaskState from, TaskState to) { + String jobType = jobConfig.getJobType(); + if (jobType == null || jobType.length() == 0) { + jobType = DEFAULT_WORKFLOW_JOB_TYPE; + } + + if (!_perTypeJobMonitorMap.containsKey(jobType)) { + JobMonitor monitor = new JobMonitor(_clusterName, jobType); + try { + registerJob(monitor); + } catch (MalformedObjectNameException e) { + LOG.error("Failed to register job type : " + jobType, e); + } + _perTypeJobMonitorMap.put(jobType, monitor); + } + + _perTypeJobMonitorMap.get(jobType).updateJobStats(from, to); + } + private synchronized void registerInstances(Collection<InstanceMonitor> instances) throws MalformedObjectNameException { for (InstanceMonitor monitor : instances) { @@ -438,6 +521,35 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { _perInstanceResourceMap.keySet().removeAll(beanNames); } + private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor) + throws MalformedObjectNameException { + String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType()); + register(workflowMonitor, getObjectName(workflowBeanName)); + } + + private synchronized void unregisterWorkflows(Collection<String> workflowMonitors) + throws MalformedObjectNameException { + for (String workflowMonitor : workflowMonitors) { + String workflowBeanName = getWorkflowBeanName(workflowMonitor); + unregister(getObjectName(workflowBeanName)); + _perTypeWorkflowMonitorMap.remove(workflowMonitor); + } + } + + private synchronized void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException { + String jobBeanName = getJobBeanName(jobMonitor.getJobType()); + register(jobMonitor, getObjectName(jobBeanName)); + } + + private synchronized void unregisterJobs(Collection<String> jobMonitors) + throws MalformedObjectNameException { + for (String jobMonitor : jobMonitors) { + String jobBeanName = getJobBeanName(jobMonitor); + unregister(getObjectName(jobBeanName)); + _perTypeJobMonitorMap.remove(jobMonitor); + } + } + public String clusterBeanName() { return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName); } @@ -472,6 +584,26 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { instanceName, resourceName).toString()); } + /** + * Build workflow per type bean name + * "cluster={clusterName},workflowType={workflowType}, + * @param workflowType The workflow type + * @return per workflow type bean name + */ + public String getWorkflowBeanName(String workflowType) { + return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, workflowType); + } + + /** + * Build job per type bean name + * "cluster={clusterName},jobType={jobType}, + * @param jobType The job type + * @return per job type bean name + */ + public String getJobBeanName(String jobType) { + return String.format("%s, %s=%s", clusterBeanName(), JOB_TYPE_DN_KEY, jobType); + } + @Override public String getSensorName() { return CLUSTER_STATUS_KEY + "." + _clusterName; http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java new file mode 100644 index 0000000..5754b8d --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java @@ -0,0 +1,118 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.task.TaskState; + +public class JobMonitor implements JobMonitorMBean { + + private static final String JOB_KEY = "Job"; + + private String _clusterName; + private String _jobType; + + private long _allJobCount; + private long _successfullJobCount; + private long _failedJobCount; + private long _existingJobGauge; + private long _queuedJobGauge; + private long _runningJobGauge; + + public JobMonitor(String clusterName, String jobType) { + _clusterName = clusterName; + _jobType = jobType; + _allJobCount = 0L; + _successfullJobCount = 0L; + _failedJobCount = 0L; + _existingJobGauge = 0L; + _queuedJobGauge = 0L; + _runningJobGauge = 0L; + } + + @Override + public long getAllJobCount() { + return _allJobCount; + } + + @Override + public long getSuccessfulJobCount() { + return _successfullJobCount; + } + + @Override + public long getFailedJobCount() { + return _failedJobCount; + } + + @Override + public long getExistingJobGauge() { + return _existingJobGauge; + } + + @Override + public long getQueuedJobGauge() { + return _queuedJobGauge; + } + + @Override + public long getRunningJobGauge() { + return _runningJobGauge; + } + + @Override + public String getSensorName() { + return String.format("%s.%s.%s", _clusterName, JOB_KEY, _jobType); + } + + public String getJobType() { + return _jobType; + } + + /** + * Update job metrics with transition state + * @param from The from state of job, just created when it is null + * @param to The to state of job, cleaned by ZK when it is null + */ + public void updateJobStats(TaskState from, TaskState to) { + if (from == null) { + // From null means a new job has been created + _existingJobGauge++; + _queuedJobGauge++; + _allJobCount++; + } else if (from.equals(TaskState.NOT_STARTED)) { + // From NOT_STARTED means queued job number has been decreased one + _queuedJobGauge--; + } else if (from.equals(TaskState.IN_PROGRESS)) { + // From IN_PROGRESS means running job number has been decreased one + _runningJobGauge--; + } + + if (to == null) { + // To null means the job has been cleaned from ZK + _existingJobGauge--; + } else if (to.equals(TaskState.IN_PROGRESS)) { + _runningJobGauge++; + } else if (to.equals(TaskState.FAILED)) { + _failedJobCount++; + } else if (to.equals(TaskState.COMPLETED)) { + _successfullJobCount++; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java new file mode 100644 index 0000000..2685096 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java @@ -0,0 +1,64 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.monitoring.SensorNameProvider; + +/** + * Job monitor MBean for jobs, which are shared among jobs with the same type. + */ +public interface JobMonitorMBean extends SensorNameProvider { + + /** + * Get numbers of all job count + * @return + */ + public long getAllJobCount(); + + /** + * Get numbers of the succeeded jobs + * @return + */ + public long getSuccessfulJobCount(); + + /** + * Get numbers of failed jobs + * @return + */ + public long getFailedJobCount(); + + /** + * Get number of existing jobs registered + * @return + */ + public long getExistingJobGauge(); + + /** + * Get numbers of queued jobs, which are not running jobs + * @return + */ + public long getQueuedJobGauge(); + + /** + * Get numbers of running jobs + * @return + */ + public long getRunningJobGauge(); +} http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java new file mode 100644 index 0000000..631c650 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java @@ -0,0 +1,116 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.task.TaskState; + +public class WorkflowMonitor implements WorkflowMonitorMBean { + private static final String WORKFLOW_KEY = "Workflow"; + + private String _clusterName; + private String _workflowType; + + private long _allWorkflowCount; + private long _successfulWorkflowCount; + private long _failedWorkflowCount; + private long _existingWorkflowGauge; + private long _queuedWorkflowGauge; + private long _runningWorkflowGauge; + + + public WorkflowMonitor(String clusterName, String workflowType) { + _clusterName = clusterName; + _workflowType = workflowType; + _allWorkflowCount = 0L; + _successfulWorkflowCount = 0L; + _failedWorkflowCount = 0L; + _existingWorkflowGauge = 0L; + _queuedWorkflowGauge = 0L; + _runningWorkflowGauge = 0L; + } + + @Override + public long getAllWorkflowCount() { + return _allWorkflowCount; + } + + @Override + public long getSuccessfulWorkflowCount() { + return _successfulWorkflowCount; + } + + @Override + public long getFailedWorkflowCount() { + return _failedWorkflowCount; + } + + @Override + public long getExistingWorkflowGauge() { + return _existingWorkflowGauge; + } + + @Override + public long getQueuedWorkflowGauge() { + return _queuedWorkflowGauge; + } + + @Override + public long getRunningWorkflowGauge() { + return _runningWorkflowGauge; + } + + @Override public String getSensorName() { + return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType); + } + + public String getWorkflowType() { + return _workflowType; + } + /** + * Update workflow with transition state + * @param from The from state of a workflow, created when it is null + * @param to The to state of a workflow, cleaned by ZK when it is null + */ + public void updateWorkflowStats(TaskState from, TaskState to) { + if (from == null) { + // From null means a new workflow has been created + _allWorkflowCount++; + _queuedWorkflowGauge++; + _existingWorkflowGauge++; + } else if (from.equals(TaskState.NOT_STARTED)) { + // From NOT_STARTED means queued workflow number has been decreased one + _queuedWorkflowGauge--; + } else if (from.equals(TaskState.IN_PROGRESS)) { + // From IN_PROGRESS means running workflow number has been decreased one + _runningWorkflowGauge--; + } + + if (to == null) { + // To null means the job has been cleaned from ZK + _existingWorkflowGauge--; + } else if (to.equals(TaskState.IN_PROGRESS)) { + _runningWorkflowGauge++; + } else if (to.equals(TaskState.FAILED)) { + _failedWorkflowCount++; + } else if (to.equals(TaskState.COMPLETED)) { + _successfulWorkflowCount++; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java new file mode 100644 index 0000000..a8746ad --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java @@ -0,0 +1,64 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.monitoring.SensorNameProvider; + +/** + * Workflow monitor MBean for workflows, which are shared among workflows with the same type. + */ +public interface WorkflowMonitorMBean extends SensorNameProvider { + + /** + * Get number of all workflows registered + * @return + */ + public long getAllWorkflowCount(); + + /** + * Get number of succeeded workflows + * @return + */ + public long getSuccessfulWorkflowCount(); + + /** + * Get number of failed workflows + * @return + */ + public long getFailedWorkflowCount(); + + /** + * Get number of current existing workflows + * @return + */ + public long getExistingWorkflowGauge(); + + /** + * Get number of queued but not started workflows + * @return + */ + public long getQueuedWorkflowGauge(); + + /** + * Get number of running workflows + * @return + */ + public long getRunningWorkflowGauge(); +} http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index c181ba5..378ad95 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -64,6 +64,7 @@ public class JobRebalancer extends TaskRebalancer { LOG.error("Job configuration is NULL for " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } + _clusterStatusMonitor.updateJobStatus(jobCfg, null, TaskState.NOT_STARTED); String workflowResource = jobCfg.getWorkflow(); // Fetch workflow configuration and context @@ -97,6 +98,7 @@ public class JobRebalancer extends TaskRebalancer { workflowResource, jobName, workflowState, jobState)); cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); _scheduledRebalancer.removeScheduledRebalance(jobName); + _clusterStatusMonitor.updateJobStatus(jobCfg, jobState, null); return buildEmptyAssignment(jobName, currStateOutput); } @@ -158,7 +160,6 @@ public class JobRebalancer extends TaskRebalancer { LOG.debug("Job " + jobName + " new assignment " + Arrays .toString(newAssignment.getMappedPartitions().toArray())); - return newAssignment; } @@ -197,11 +198,15 @@ public class JobRebalancer extends TaskRebalancer { // Workflow has been stopped if all in progress jobs are stopped if (isWorkflowStopped(workflowCtx, workflowConfig)) { workflowCtx.setWorkflowState(TaskState.STOPPED); + _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.STOPPED); + } } else { workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS); // Workflow is in progress if any task is in progress workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); + _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS); + } // Used to keep track of tasks that have already been assigned to instances. @@ -227,6 +232,7 @@ public class JobRebalancer extends TaskRebalancer { jobCtx.setInfo(failureMsg); markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false); + _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.FAILED); return new ResourceAssignment(jobResource); } @@ -400,6 +406,7 @@ public class JobRebalancer extends TaskRebalancer { if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx); + _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED); // remove IdealState of this job cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); } http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 6aaeb5f..22f91e7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -36,6 +36,7 @@ import org.apache.helix.PropertyKey; import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; @@ -54,6 +55,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // For connection management protected HelixManager _manager; protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer(); + protected ClusterStatusMonitor _clusterStatusMonitor; @Override public void init(HelixManager manager) { _manager = manager; @@ -368,4 +370,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } } + /** + * Set the ClusterStatusMonitor for metrics update + */ + public void setClusterStatusMonitor(ClusterStatusMonitor clusterStatusMonitor) { + _clusterStatusMonitor = clusterStatusMonitor; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index b78ee7f..9a3f7d8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -63,6 +63,7 @@ public class WorkflowRebalancer extends TaskRebalancer { workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); workflowCtx.setStartTime(System.currentTimeMillis()); LOG.debug("Workflow context is created for " + workflow); + _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, null, TaskState.NOT_STARTED); } // Clean up if workflow marked for deletion @@ -70,6 +71,7 @@ public class WorkflowRebalancer extends TaskRebalancer { if (targetState == TargetState.DELETE) { LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); cleanupWorkflow(workflow, workflowCfg); + _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, TaskState.COMPLETED, null); return buildEmptyAssignment(workflow, currStateOutput); } @@ -83,6 +85,8 @@ public class WorkflowRebalancer extends TaskRebalancer { if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx, workflowCfg)) { workflowCtx.setFinishTime(currentTime); + _clusterStatusMonitor + .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED); TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); } @@ -93,6 +97,8 @@ public class WorkflowRebalancer extends TaskRebalancer { if (workflowCtx.getFinishTime() + expiryTime <= currentTime) { LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context."); cleanupWorkflow(workflow, workflowCfg); + _clusterStatusMonitor + .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.FAILED); } else { // schedule future cleanup work long cleanupTime = workflowCtx.getFinishTime() + expiryTime; @@ -287,8 +293,12 @@ public class WorkflowRebalancer extends TaskRebalancer { try { // Start the cloned workflow driver.start(clonedWf); + _clusterStatusMonitor + .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS); } catch (Exception e) { LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); + _clusterStatusMonitor + .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.FAILED); } // Persist workflow start regardless of success to avoid retrying and failing workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName); @@ -304,6 +314,8 @@ public class WorkflowRebalancer extends TaskRebalancer { if (scheduledTime > 0 && currentTime > scheduledTime) { _scheduledRebalancer.removeScheduledRebalance(workflow); } + _clusterStatusMonitor + .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS); return true; } } else {
