http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java index 8170ae2..496c2fc 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java @@ -40,12 +40,8 @@ public class ComponentNumSelector extends AbstractSelector { @Override public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) { // TODO Auto-generated method stub - int o1Num = - context.getComponentNumOnSupervisor(o1.getNodeId(), - name); - int o2Num = - context.getComponentNumOnSupervisor(o2.getNodeId(), - name); + int o1Num = context.getComponentNumOnSupervisor(o1.getNodeId(), name); + int o2Num = context.getComponentNumOnSupervisor(o2.getNodeId(), name); if (o1Num == o2Num) return 0; return o1Num > o2Num ? 1 : -1;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java index 49eb447..f7f4f5b 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java @@ -40,12 +40,8 @@ public class InputComponentNumSelector extends AbstractSelector { @Override public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) { // TODO Auto-generated method stub - int o1Num = - context.getInputComponentNumOnSupervisor( - o1.getNodeId(), name); - int o2Num = - context.getInputComponentNumOnSupervisor( - o2.getNodeId(), name); + int o1Num = context.getInputComponentNumOnSupervisor(o1.getNodeId(), name); + int o2Num = context.getInputComponentNumOnSupervisor(o2.getNodeId(), name); if (o1Num == o2Num) return 0; return o1Num > o2Num ? -1 : 1; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java index adc8b29..6ef5736 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java @@ -22,6 +22,5 @@ import java.util.List; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; public interface Selector { - public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, - String name); + public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java index f01ee9a..8643d6a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java @@ -21,8 +21,7 @@ import java.util.Comparator; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; -public abstract class WorkerComparator implements - Comparator<ResourceWorkerSlot> { +public abstract class WorkerComparator implements Comparator<ResourceWorkerSlot> { protected String name; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java index f81d072..33f762a 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java @@ -24,33 +24,39 @@ import java.util.Map.Entry; import java.util.Set; public class TaskAssignContext { - + private final Map<Integer, String> taskToComponent; + private final Map<String, List<ResourceWorkerSlot>> supervisorToWorker; private final Map<String, Set<String>> relationship; // Map<worker, Map<component name, assigned task num in this worker> - private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum = - new HashMap<ResourceWorkerSlot, Map<String, Integer>>(); + private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum = new HashMap<ResourceWorkerSlot, Map<String, Integer>>(); // Map<available worker, assigned task num in this worker> - private final Map<ResourceWorkerSlot, Integer> workerToTaskNum = - new HashMap<ResourceWorkerSlot, Integer>(); + private final Map<ResourceWorkerSlot, Integer> workerToTaskNum = new HashMap<ResourceWorkerSlot, Integer>(); - private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap = - new HashMap<String, ResourceWorkerSlot>(); + private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap = new HashMap<String, ResourceWorkerSlot>(); - public TaskAssignContext( - Map<String, List<ResourceWorkerSlot>> supervisorToWorker, - Map<String, Set<String>> relationship) { + public TaskAssignContext(Map<String, List<ResourceWorkerSlot>> supervisorToWorker, Map<String, Set<String>> relationship, Map<Integer, String> taskToComponent) { + this.taskToComponent = taskToComponent; this.supervisorToWorker = supervisorToWorker; this.relationship = relationship; - for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker - .entrySet()) { + for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker.entrySet()) { for (ResourceWorkerSlot worker : entry.getValue()) { - workerToTaskNum.put(worker, 0); + workerToTaskNum.put(worker, (worker.getTasks() != null ? worker.getTasks().size() : 0)); HostPortToWorkerMap.put(worker.getHostPort(), worker); + + if (worker.getTasks() != null) { + Map<String, Integer> componentToNum = new HashMap<String, Integer>(); + for (Integer taskId : worker.getTasks()) { + String componentId = taskToComponent.get(taskId); + int num = componentToNum.get(componentId) == null ? 0 : componentToNum.get(componentId); + componentToNum.put(componentId, ++num); + } + workerToComponentNum.put(worker, componentToNum); + } } } } @@ -115,8 +121,7 @@ public class TaskAssignContext { return result; } - public int getInputComponentNumOnWorker(ResourceWorkerSlot worker, - String name) { + public int getInputComponentNumOnWorker(ResourceWorkerSlot worker, String name) { int result = 0; for (String component : relationship.get(name)) result = result + this.getComponentNumOnWorker(worker, component); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java index 7131463..a359972 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java @@ -41,20 +41,15 @@ public class TaskScheduler { public static Logger LOG = LoggerFactory.getLogger(TaskScheduler.class); - public static final String ACKER_NAME = "__acker"; - private final TaskAssignContext taskContext; - private List<ResourceWorkerSlot> assignments = - new ArrayList<ResourceWorkerSlot>(); + private List<ResourceWorkerSlot> assignments = new ArrayList<ResourceWorkerSlot>(); private int workerNum; /** - * For balance purpose, default scheduler is trying to assign the same - * number of tasks to a worker. e.g. There are 4 tasks and 3 available - * workers. Each worker will be assigned one task first. And then one worker - * is chosen for the last one. + * For balance purpose, default scheduler is trying to assign the same number of tasks to a worker. e.g. There are 4 tasks and 3 available workers. Each + * worker will be assigned one task first. And then one worker is chosen for the last one. */ private int avgTaskNum; private int leftTaskNum; @@ -69,48 +64,91 @@ public class TaskScheduler { private Selector totalTaskNumSelector; - public TaskScheduler(DefaultTopologyAssignContext context, - Set<Integer> tasks, List<ResourceWorkerSlot> workers) { + public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) { this.tasks = tasks; - LOG.info("Tasks " + tasks + " is going to be assigned in workers " - + workers); + LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers); this.context = context; this.taskContext = - new TaskAssignContext(this.buildSupervisorToWorker(workers), - Common.buildSpoutOutoputAndBoltInputMap(context)); + new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent()); this.componentSelector = new ComponentNumSelector(taskContext); - this.inputComponentSelector = - new InputComponentNumSelector(taskContext); + this.inputComponentSelector = new InputComponentNumSelector(taskContext); this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext); if (tasks.size() == 0) return; - setTaskNum(tasks.size(), workerNum); + if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){ + // warning ! it doesn't consider HA TM now!! + if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) { + assignForTopologyMaster(); + } + } + + int taskNum = tasks.size(); + Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum(); + Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>(); + for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) { + if (worker.getValue() > 0) { + taskNum += worker.getValue(); + preAssignWorkers.add(worker.getKey()); + } + } + setTaskNum(taskNum, workerNum); + + // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers. + // Remove the workers which have been assigned with enough workers. + for (ResourceWorkerSlot worker : preAssignWorkers) { + Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker); + if (doneWorkers != null) { + for (ResourceWorkerSlot doneWorker : doneWorkers) { + taskNum -= doneWorker.getTasks().size(); + workerNum--; + } + } + } + setTaskNum(taskNum, workerNum); // For Scale-out case, the old assignment should be kept. - if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE - && context.isReassign() == false) { - keepAssignment(context.getOldAssignment().getWorkers()); + if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) { + keepAssignment(taskNum, context.getOldAssignment().getWorkers()); } } - private void keepAssignment(Set<ResourceWorkerSlot> keepAssignments) { + private void keepAssignment(int taskNum, Set<ResourceWorkerSlot> keepAssignments) { Set<Integer> keepTasks = new HashSet<Integer>(); + ResourceWorkerSlot tmWorker = null; for (ResourceWorkerSlot worker : keepAssignments) { + if (worker.getTasks().contains(context.getTopologyMasterTaskId())) + tmWorker = worker; for (Integer taskId : worker.getTasks()) { if (tasks.contains(taskId)) { - ResourceWorkerSlot contextWorker = - taskContext.getWorker(worker); + ResourceWorkerSlot contextWorker = taskContext.getWorker(worker); if (contextWorker != null) { - String componentName = - context.getTaskToComponent().get(taskId); - updateAssignedTasksOfWorker(taskId, contextWorker); - updateComponentsNumOfWorker(componentName, - contextWorker); - keepTasks.add(taskId); + if (tmWorker != null && tmWorker.getTasks().contains(taskId) && context.getAssignSingleWorkerForTM() ) { + if (context.getTopologyMasterTaskId() == taskId){ + updateAssignedTasksOfWorker(taskId, contextWorker); + taskContext.getWorkerToTaskNum().remove(contextWorker); + contextWorker.getTasks().clear(); + contextWorker.getTasks().add(taskId); + assignments.add(contextWorker); + tasks.remove(taskId); + taskNum--; + workerNum--; + LOG.info("assignForTopologyMaster: " + contextWorker); + } + }else { + String componentName = context.getTaskToComponent().get(taskId); + updateAssignedTasksOfWorker(taskId, contextWorker); + updateComponentsNumOfWorker(componentName, contextWorker); + keepTasks.add(taskId); + } } } } } + if ( tmWorker != null){ + setTaskNum(taskNum, workerNum); + keepAssignments.remove(tmWorker); + } + // Try to find the workers which have been assigned too much tasks // If found, remove the workers from worker resource pool and update @@ -118,11 +156,9 @@ public class TaskScheduler { int doneAssignedTaskNum = 0; while (true) { boolean found = false; - Set<ResourceWorkerSlot> doneAssignedWorkers = - new HashSet<ResourceWorkerSlot>(); + Set<ResourceWorkerSlot> doneAssignedWorkers = new HashSet<ResourceWorkerSlot>(); for (ResourceWorkerSlot worker : keepAssignments) { - ResourceWorkerSlot contextWorker = - taskContext.getWorker(worker); + ResourceWorkerSlot contextWorker = taskContext.getWorker(worker); if (contextWorker != null && isTaskFullForWorker(contextWorker)) { found = true; workerNum--; @@ -135,7 +171,8 @@ public class TaskScheduler { } if (found) { - setTaskNum((tasks.size() - doneAssignedTaskNum), workerNum); + taskNum -= doneAssignedTaskNum; + setTaskNum(taskNum, workerNum); keepAssignments.removeAll(doneAssignedWorkers); } else { break; @@ -150,45 +187,89 @@ public class TaskScheduler { Set<Integer> tasks = worker.getTasks(); if (tasks != null) { - if ((leftTaskNum == 0 && tasks.size() >= avgTaskNum) - || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) { + if ((leftTaskNum <= 0 && tasks.size() >= avgTaskNum) || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) { ret = true; } } return ret; } + private Set<ResourceWorkerSlot> getRestAssignedWorkers() { + Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>(); + for (ResourceWorkerSlot worker : taskContext.getWorkerToTaskNum().keySet()) { + if (worker.getTasks() != null && worker.getTasks().size() > 0) { + ret.add(worker); + } + } + return ret; + } + public List<ResourceWorkerSlot> assign() { - if (tasks.size() == 0) + if (tasks.size() == 0) { + assignments.addAll(getRestAssignedWorkers()); return assignments; + } // Firstly, assign workers to the components which are configured // by "task.on.differ.node" Set<Integer> assignedTasks = assignForDifferNodeTask(); - // Assign for the tasks except acker + // Assign for the tasks except system tasks tasks.removeAll(assignedTasks); - Set<Integer> ackers = new HashSet<Integer>(); + Map<Integer, String> systemTasks = new HashMap<Integer, String>(); for (Integer task : tasks) { String name = context.getTaskToComponent().get(task); - if (name.equals(TaskScheduler.ACKER_NAME)) { - ackers.add(task); + if (Common.isSystemComponent(name)) { + systemTasks.put(task, name); continue; } assignForTask(name, task); } - - // At last, make the assignment for acker - for (Integer task : ackers) { - assignForTask(TaskScheduler.ACKER_NAME, task); + + /* + * At last, make the assignment for system component, e.g. acker, topology master... + */ + for (Entry<Integer, String> entry : systemTasks.entrySet()) { + assignForTask(entry.getValue(), entry.getKey()); } + + assignments.addAll(getRestAssignedWorkers()); return assignments; } + private void assignForTopologyMaster() { + int taskId = context.getTopologyMasterTaskId(); + + // Try to find a worker which is in a supervisor with most workers, + // to avoid the balance problem when the assignment for other workers. + ResourceWorkerSlot workerAssigned = null; + int workerNumOfSuperv = 0; + for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){ + List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId()); + if (workers != null && workers.size() > workerNumOfSuperv) { + for (ResourceWorkerSlot worker : workers) { + Set<Integer> tasks = worker.getTasks(); + if (tasks == null || tasks.size() == 0) { + workerAssigned = worker; + workerNumOfSuperv = workers.size(); + break; + } + } + } + } + + if (workerAssigned == null) + throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master"); + updateAssignedTasksOfWorker(taskId, workerAssigned); + taskContext.getWorkerToTaskNum().remove(workerAssigned); + assignments.add(workerAssigned); + tasks.remove(taskId); + workerNum--; + LOG.info("assignForTopologyMaster, assignments=" + assignments); + } + private void assignForTask(String name, Integer task) { - ResourceWorkerSlot worker = - chooseWorker(name, new ArrayList<ResourceWorkerSlot>( - taskContext.getWorkerToTaskNum().keySet())); + ResourceWorkerSlot worker = chooseWorker(name, new ArrayList<ResourceWorkerSlot>(taskContext.getWorkerToTaskNum().keySet())); pushTaskToWorker(task, name, worker); } @@ -201,98 +282,97 @@ public class TaskScheduler { } for (Integer task : ret) { String name = context.getTaskToComponent().get(task); - ResourceWorkerSlot worker = - chooseWorker(name, getDifferNodeTaskWokers(name)); + ResourceWorkerSlot worker = chooseWorker(name, getDifferNodeTaskWokers(name)); + LOG.info("Due to task.on.differ.node, push task-{} to {}:{}", task, worker.getHostname(), worker.getPort()); pushTaskToWorker(task, name, worker); } return ret; } - private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker( - List<ResourceWorkerSlot> workers) { - Map<String, List<ResourceWorkerSlot>> supervisorToWorker = - new HashMap<String, List<ResourceWorkerSlot>>(); + private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(List<ResourceWorkerSlot> workers) { + Map<String, List<ResourceWorkerSlot>> supervisorToWorker = new HashMap<String, List<ResourceWorkerSlot>>(); for (ResourceWorkerSlot worker : workers) { - if (worker.getTasks() == null || worker.getTasks().size() == 0) { - List<ResourceWorkerSlot> supervisor = - supervisorToWorker.get(worker.getNodeId()); - if (supervisor == null) { - supervisor = new ArrayList<ResourceWorkerSlot>(); - supervisorToWorker.put(worker.getNodeId(), supervisor); - } - supervisor.add(worker); - } else { - assignments.add(worker); + List<ResourceWorkerSlot> supervisor = supervisorToWorker.get(worker.getNodeId()); + if (supervisor == null) { + supervisor = new ArrayList<ResourceWorkerSlot>(); + supervisorToWorker.put(worker.getNodeId(), supervisor); } + supervisor.add(worker); } - this.workerNum = workers.size() - assignments.size(); + this.workerNum = workers.size(); return supervisorToWorker; } - private ResourceWorkerSlot chooseWorker(String name, - List<ResourceWorkerSlot> workers) { - List<ResourceWorkerSlot> result = - componentSelector.select(workers, name); + private ResourceWorkerSlot chooseWorker(String name, List<ResourceWorkerSlot> workers) { + List<ResourceWorkerSlot> result = componentSelector.select(workers, name); result = totalTaskNumSelector.select(result, name); - if (name.equals(TaskScheduler.ACKER_NAME)) + if (Common.isSystemComponent(name)) return result.iterator().next(); result = inputComponentSelector.select(result, name); return result.iterator().next(); } - private void pushTaskToWorker(Integer task, String name, - ResourceWorkerSlot worker) { + private void pushTaskToWorker(Integer task, String name, ResourceWorkerSlot worker) { LOG.debug("Push task-" + task + " to worker-" + worker.getPort()); int taskNum = updateAssignedTasksOfWorker(task, worker); + removeWorkerFromSrcPool(taskNum, worker); + + updateComponentsNumOfWorker(name, worker); + } + + private int updateAssignedTasksOfWorker(Integer task, ResourceWorkerSlot worker) { + int ret = 0; + Set<Integer> tasks = worker.getTasks(); + if (tasks == null) { + tasks = new HashSet<Integer>(); + worker.setTasks(tasks); + } + tasks.add(task); + + ret = taskContext.getWorkerToTaskNum().get(worker); + taskContext.getWorkerToTaskNum().put(worker, ++ret); + return ret; + } + + /* + * Remove the worker from source worker pool, if the worker is assigned with enough tasks, + */ + private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) { + Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>(); + if (leftTaskNum <= 0) { - if (taskNum == avgTaskNum) { + if (taskNum >= avgTaskNum) { taskContext.getWorkerToTaskNum().remove(worker); assignments.add(worker); + ret.add(worker); } } else { - if (taskNum == (avgTaskNum + 1)) { + if (taskNum > avgTaskNum ) { taskContext.getWorkerToTaskNum().remove(worker); - leftTaskNum--; + leftTaskNum = leftTaskNum -(taskNum -avgTaskNum); assignments.add(worker); + ret.add(worker); } if (leftTaskNum <= 0) { - List<ResourceWorkerSlot> needDelete = - new ArrayList<ResourceWorkerSlot>(); - for (Entry<ResourceWorkerSlot, Integer> entry : taskContext - .getWorkerToTaskNum().entrySet()) { + List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>(); + for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) { if (entry.getValue() == avgTaskNum) needDelete.add(entry.getKey()); } for (ResourceWorkerSlot workerToDelete : needDelete) { taskContext.getWorkerToTaskNum().remove(workerToDelete); assignments.add(workerToDelete); + ret.add(workerToDelete); } } } - updateComponentsNumOfWorker(name, worker); - } - - private int updateAssignedTasksOfWorker(Integer task, - ResourceWorkerSlot worker) { - int ret = 0; - Set<Integer> tasks = worker.getTasks(); - if (tasks == null) { - tasks = new HashSet<Integer>(); - worker.setTasks(tasks); - } - tasks.add(task); - - ret = taskContext.getWorkerToTaskNum().get(worker); - taskContext.getWorkerToTaskNum().put(worker, ++ret); return ret; } - private void updateComponentsNumOfWorker(String name, - ResourceWorkerSlot worker) { - Map<String, Integer> components = - taskContext.getWorkerToComponentNum().get(worker); + private void updateComponentsNumOfWorker(String name, ResourceWorkerSlot worker) { + Map<String, Integer> components = taskContext.getWorkerToComponentNum().get(worker); if (components == null) { components = new HashMap<String, Integer>(); taskContext.getWorkerToComponentNum().put(worker, components); @@ -308,11 +388,9 @@ public class TaskScheduler { if (taskNum >= 0 && workerNum > 0) { this.avgTaskNum = taskNum / workerNum; this.leftTaskNum = taskNum % workerNum; - LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum=" - + leftTaskNum); + LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum=" + leftTaskNum); } else { - LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum=" - + workerNum); + LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum=" + workerNum); } } @@ -320,15 +398,12 @@ public class TaskScheduler { List<ResourceWorkerSlot> workers = new ArrayList<ResourceWorkerSlot>(); workers.addAll(taskContext.getWorkerToTaskNum().keySet()); - for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext - .getSupervisorToWorker().entrySet()) { + for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext.getSupervisorToWorker().entrySet()) { if (taskContext.getComponentNumOnSupervisor(entry.getKey(), name) != 0) workers.removeAll(entry.getValue()); } if (workers.size() == 0) - throw new FailedAssignTopologyException( - "there's no enough supervisor for making component: " - + name + " 's tasks on different node"); + throw new FailedAssignTopologyException("there's no enough supervisor for making component: " + name + " 's tasks on different node"); return workers; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java index c85d723..08c4730 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java @@ -28,6 +28,7 @@ import backtype.storm.Config; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.client.WorkerAssignment; +import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.TopologyAssignContext; import com.alibaba.jstorm.utils.FailedAssignTopologyException; @@ -35,353 +36,369 @@ import com.alibaba.jstorm.utils.NetWorkUtils; public class WorkerScheduler { - public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class); - - private static WorkerScheduler instance; - - private WorkerScheduler() { - - } - - public static WorkerScheduler getInstance() { - if (instance == null) { - instance = new WorkerScheduler(); - } - return instance; - } - - public List<ResourceWorkerSlot> getAvailableWorkers( - DefaultTopologyAssignContext context, Set<Integer> needAssign, - int num) { - int workersNum = getWorkersNum(context, num); - if (workersNum == 0) { - throw new FailedAssignTopologyException("there's no enough worker"); - } - List<ResourceWorkerSlot> assignedWorkers = - new ArrayList<ResourceWorkerSlot>(); - // userdefine assignments - getRightWorkers( - context, - needAssign, - assignedWorkers, - workersNum, - getUserDefineWorkers(context, ConfigExtension - .getUserDefineAssignment(context.getStormConf()))); - // old assignments - if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { - getRightWorkers(context, needAssign, assignedWorkers, workersNum, - context.getOldWorkers()); - } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE - && context.isReassign() == false) { - int cnt = 0; - for (ResourceWorkerSlot worker : context.getOldWorkers()) { - if (cnt < workersNum) { - ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); - resFreeWorker.setPort(worker.getPort()); - resFreeWorker.setHostname(worker.getHostname()); - resFreeWorker.setNodeId(worker.getNodeId()); - assignedWorkers.add(resFreeWorker); - cnt++; - } else { - break; - } - } - } - int defaultWorkerNum = - Math.min(workersNum - assignedWorkers.size(), needAssign.size()); - LOG.info("Get workers from user define and old assignments: " - + assignedWorkers); - for (int i = 0; i < defaultWorkerNum; i++) { - assignedWorkers.add(new ResourceWorkerSlot()); - } - List<SupervisorInfo> isolationSupervisors = - this.getIsolationSupervisors(context); - if (isolationSupervisors.size() != 0) { - putAllWorkerToSupervisor(assignedWorkers, - getResAvailSupervisors(isolationSupervisors)); - } else { - putAllWorkerToSupervisor(assignedWorkers, - getResAvailSupervisors(context.getCluster())); - } - this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); - LOG.info("Assigned workers=" + assignedWorkers); - return assignedWorkers; - } - - private void setAllWorkerMemAndCpu(Map conf, - List<ResourceWorkerSlot> assignedWorkers) { - long defaultSize = ConfigExtension.getMemSizePerWorker(conf); - int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf); - for (ResourceWorkerSlot worker : assignedWorkers) { - if (worker.getMemSize() <= 0) - worker.setMemSize(defaultSize); - if (worker.getCpu() <= 0) - worker.setCpu(defaultCpu); - } - } - - private void putAllWorkerToSupervisor( - List<ResourceWorkerSlot> assignedWorkers, - List<SupervisorInfo> supervisors) { - for (ResourceWorkerSlot worker : assignedWorkers) { - if (worker.getHostname() != null) { - for (SupervisorInfo supervisor : supervisors) { - if (NetWorkUtils.equals(supervisor.getHostName(), - worker.getHostname()) - && supervisor.getAvailableWorkerPorts().size() > 0) { - putWorkerToSupervisor(supervisor, worker); - break; - } - } - } - } - supervisors = getResAvailSupervisors(supervisors); - Collections.sort(supervisors, new Comparator<SupervisorInfo>() { - - @Override - public int compare(SupervisorInfo o1, SupervisorInfo o2) { - // TODO Auto-generated method stub - return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2 - .getAvailableWorkerPorts().size()); - } - - }); - putWorkerToSupervisor(assignedWorkers, supervisors); - } - - private void putWorkerToSupervisor(SupervisorInfo supervisor, - ResourceWorkerSlot worker) { - int port = worker.getPort(); - if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) { - port = supervisor.getAvailableWorkerPorts().iterator().next(); - } - worker.setPort(port); - supervisor.getAvailableWorkerPorts().remove(port); - worker.setNodeId(supervisor.getSupervisorId()); - } - - private void putWorkerToSupervisor( - List<ResourceWorkerSlot> assignedWorkers, - List<SupervisorInfo> supervisors) { - int allUsedPorts = 0; - for (SupervisorInfo supervisor : supervisors) { - int supervisorUsedPorts = supervisor.getWorkerPorts().size() - - supervisor.getAvailableWorkerPorts().size(); - allUsedPorts = allUsedPorts + supervisorUsedPorts; - } - // per supervisor should be allocated ports in theory - int theoryAveragePorts = - (allUsedPorts + assignedWorkers.size()) / supervisors.size() - + 1; - // supervisor which use more than theoryAveragePorts ports will be - // pushed overLoadSupervisors - List<SupervisorInfo> overLoadSupervisors = - new ArrayList<SupervisorInfo>(); - int key = 0; - Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator(); - while (iterator.hasNext()) { - if (supervisors.size() == 0) - break; - if (key >= supervisors.size()) - key = 0; - SupervisorInfo supervisor = supervisors.get(key); - int supervisorUsedPorts = supervisor.getWorkerPorts().size() - - supervisor.getAvailableWorkerPorts().size(); - if (supervisorUsedPorts < theoryAveragePorts) { - ResourceWorkerSlot worker = iterator.next(); - if (worker.getNodeId() != null) - continue; - worker.setHostname(supervisor.getHostName()); - worker.setNodeId(supervisor.getSupervisorId()); - worker.setPort( - supervisor.getAvailableWorkerPorts().iterator().next()); - supervisor.getAvailableWorkerPorts().remove(worker.getPort()); - if (supervisor.getAvailableWorkerPorts().size() == 0) - supervisors.remove(supervisor); - key++; - } else { - overLoadSupervisors.add(supervisor); - supervisors.remove(supervisor); - } - } - // rest assignedWorkers will be allocate supervisor by deal - Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() { - - @Override - public int compare(SupervisorInfo o1, SupervisorInfo o2) { - // TODO Auto-generated method stub - return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), - o2.getAvailableWorkerPorts().size()); - } - - }); - key = 0; - while (iterator.hasNext()) { - if (overLoadSupervisors.size() == 0) - break; - if (key >= overLoadSupervisors.size()) - key = 0; - ResourceWorkerSlot worker = iterator.next(); - if (worker.getNodeId() != null) - continue; - SupervisorInfo supervisor = overLoadSupervisors.get(key); - worker.setHostname(supervisor.getHostName()); - worker.setNodeId(supervisor.getSupervisorId()); - worker.setPort( - supervisor.getAvailableWorkerPorts().iterator().next()); - supervisor.getAvailableWorkerPorts().remove(worker.getPort()); - if (supervisor.getAvailableWorkerPorts().size() == 0) - overLoadSupervisors.remove(supervisor); - key++; - } - } - - private void getRightWorkers(DefaultTopologyAssignContext context, - Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers, - int workersNum, Collection<ResourceWorkerSlot> workers) { - Set<Integer> assigned = new HashSet<Integer>(); - List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>(); - if (workers == null) - return; - for (ResourceWorkerSlot worker : workers) { - boolean right = true; - Set<Integer> tasks = worker.getTasks(); - if (tasks == null) - continue; - for (Integer task : tasks) { - if (!needAssign.contains(task) || assigned.contains(task)) { - right = false; - break; - } - } - if (right) { - assigned.addAll(tasks); - users.add(worker); - } - } - if (users.size() + assignedWorkers.size() > workersNum) { - return; - } - - if (users.size() + assignedWorkers.size() == workersNum - && assigned.size() != needAssign.size()) { - return; - } - assignedWorkers.addAll(users); - needAssign.removeAll(assigned); - } - - private int getWorkersNum(DefaultTopologyAssignContext context, - int workersNum) { - Map<String, SupervisorInfo> supervisors = context.getCluster(); - List<SupervisorInfo> isolationSupervisors = - this.getIsolationSupervisors(context); - int slotNum = 0; - - if (isolationSupervisors.size() != 0) { - for (SupervisorInfo superivsor : isolationSupervisors) { - slotNum = slotNum + superivsor.getAvailableWorkerPorts().size(); - } - return Math.min(slotNum, workersNum); - } - for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) { - slotNum = slotNum + entry.getValue().getAvailableWorkerPorts().size(); - } - return Math.min(slotNum, workersNum); - } - - /** - * @param context - * @param workers - * @return - */ - private List<ResourceWorkerSlot> getUserDefineWorkers( - DefaultTopologyAssignContext context, List<WorkerAssignment> workers) { - List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>(); - if (workers == null) - return ret; - Map<String, List<Integer>> componentToTask = - (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context - .getComponentTasks()).clone(); - if (context.getAssignType() != context.ASSIGN_TYPE_NEW) { - checkUserDefineWorkers(context, workers, - context.getTaskToComponent()); - } - for (WorkerAssignment worker : workers) { - ResourceWorkerSlot workerSlot = - new ResourceWorkerSlot(worker, componentToTask); - if (workerSlot.getTasks().size() != 0) { - ret.add(workerSlot); - } - } - return ret; - } - - private void checkUserDefineWorkers(DefaultTopologyAssignContext context, - List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) { - Set<ResourceWorkerSlot> unstoppedWorkers = - context.getUnstoppedWorkers(); - List<WorkerAssignment> re = new ArrayList<WorkerAssignment>(); - for (WorkerAssignment worker : workers) { - for (ResourceWorkerSlot unstopped : unstoppedWorkers) { - if (unstopped - .compareToUserDefineWorker(worker, taskToComponent)) - re.add(worker); - } - } - workers.removeAll(re); - - } - - private List<SupervisorInfo> getResAvailSupervisors( - Map<String, SupervisorInfo> supervisors) { - List<SupervisorInfo> availableSupervisors = - new ArrayList<SupervisorInfo>(); - for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) { - SupervisorInfo supervisor = entry.getValue(); - if (supervisor.getAvailableWorkerPorts().size() > 0) - availableSupervisors.add(entry.getValue()); - } - return availableSupervisors; - } - - private List<SupervisorInfo> getResAvailSupervisors( - List<SupervisorInfo> supervisors) { - List<SupervisorInfo> availableSupervisors = - new ArrayList<SupervisorInfo>(); - for (SupervisorInfo supervisor : supervisors) { - if (supervisor.getAvailableWorkerPorts().size() > 0) - availableSupervisors.add(supervisor); - } - return availableSupervisors; - } - - private List<SupervisorInfo> getIsolationSupervisors( - DefaultTopologyAssignContext context) { - List<String> isolationHosts = - (List<String>) context.getStormConf().get( - Config.ISOLATION_SCHEDULER_MACHINES); - LOG.info("Isolation machines: " + isolationHosts); - if (isolationHosts == null) - return new ArrayList<SupervisorInfo>(); - List<SupervisorInfo> isolationSupervisors = - new ArrayList<SupervisorInfo>(); - for (Entry<String, SupervisorInfo> entry : context.getCluster() - .entrySet()) { - if (containTargetHost(isolationHosts, entry.getValue() - .getHostName())) { - isolationSupervisors.add(entry.getValue()); - } - } - return isolationSupervisors; - } - - private boolean containTargetHost(Collection<String> hosts, String target) { - for (String host : hosts) { - if (NetWorkUtils.equals(host, target) == true) { - return true; - } - } - return false; - } + public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class); + + private static WorkerScheduler instance; + + private WorkerScheduler() { + + } + + public static WorkerScheduler getInstance() { + if (instance == null) { + instance = new WorkerScheduler(); + } + return instance; + } + + public List<ResourceWorkerSlot> getAvailableWorkers( + DefaultTopologyAssignContext context, Set<Integer> needAssign, + int allocWorkerNum) { + int workersNum = getAvailableWorkersNum(context); + if (workersNum < allocWorkerNum) { + throw new FailedAssignTopologyException( + "there's no enough worker. allocWorkerNum=" + + allocWorkerNum + ", availableWorkerNum=" + + workersNum); + } + workersNum = allocWorkerNum; + List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>(); + // userdefine assignments, but dont't try to use custom scheduling for + // TM bolts now. + getRightWorkers( + context, + needAssign, + assignedWorkers, + workersNum, + getUserDefineWorkers(context, ConfigExtension + .getUserDefineAssignment(context.getStormConf()))); + + // old assignments + if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { + getRightWorkers(context, needAssign, assignedWorkers, workersNum, + context.getOldWorkers()); + } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE + && context.isReassign() == false) { + int cnt = 0; + for (ResourceWorkerSlot worker : context.getOldWorkers()) { + if (cnt < workersNum) { + ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); + resFreeWorker.setPort(worker.getPort()); + resFreeWorker.setHostname(worker.getHostname()); + resFreeWorker.setNodeId(worker.getNodeId()); + assignedWorkers.add(resFreeWorker); + cnt++; + } else { + break; + } + } + } + // calculate rest TM bolts + int workersForSingleTM = 0; + if (context.getAssignSingleWorkerForTM()) { + for (Integer taskId : needAssign) { + String componentName = context.getTaskToComponent().get(taskId); + if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { + workersForSingleTM++; + } + } + } + + LOG.info("Get workers from user define and old assignments: " + + assignedWorkers); + + int restWokerNum = workersNum - assignedWorkers.size(); + if (restWokerNum < 0) + throw new FailedAssignTopologyException( + "Too much workers are needed for user define or old assignments. workersNum=" + + workersNum + ", assignedWokersNum=" + + assignedWorkers.size()); + + for (int i = 0; i < restWokerNum; i++) { + assignedWorkers.add(new ResourceWorkerSlot()); + } + List<SupervisorInfo> isolationSupervisors = this + .getIsolationSupervisors(context); + if (isolationSupervisors.size() != 0) { + putAllWorkerToSupervisor(assignedWorkers, + getResAvailSupervisors(isolationSupervisors)); + } else { + putAllWorkerToSupervisor(assignedWorkers, + getResAvailSupervisors(context.getCluster())); + } + this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); + LOG.info("Assigned workers=" + assignedWorkers); + return assignedWorkers; + } + + private void setAllWorkerMemAndCpu(Map conf, + List<ResourceWorkerSlot> assignedWorkers) { + long defaultSize = ConfigExtension.getMemSizePerWorker(conf); + int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf); + for (ResourceWorkerSlot worker : assignedWorkers) { + if (worker.getMemSize() <= 0) + worker.setMemSize(defaultSize); + if (worker.getCpu() <= 0) + worker.setCpu(defaultCpu); + } + } + + private void putAllWorkerToSupervisor( + List<ResourceWorkerSlot> assignedWorkers, + List<SupervisorInfo> supervisors) { + for (ResourceWorkerSlot worker : assignedWorkers) { + if (worker.getHostname() != null) { + for (SupervisorInfo supervisor : supervisors) { + if (NetWorkUtils.equals(supervisor.getHostName(), + worker.getHostname()) + && supervisor.getAvailableWorkerPorts().size() > 0) { + putWorkerToSupervisor(supervisor, worker); + break; + } + } + } + } + supervisors = getResAvailSupervisors(supervisors); + Collections.sort(supervisors, new Comparator<SupervisorInfo>() { + + @Override + public int compare(SupervisorInfo o1, SupervisorInfo o2) { + // TODO Auto-generated method stub + return -NumberUtils.compare( + o1.getAvailableWorkerPorts().size(), o2 + .getAvailableWorkerPorts().size()); + } + + }); + putWorkerToSupervisor(assignedWorkers, supervisors); + } + + private void putWorkerToSupervisor(SupervisorInfo supervisor, + ResourceWorkerSlot worker) { + int port = worker.getPort(); + if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) { + port = supervisor.getAvailableWorkerPorts().iterator().next(); + } + worker.setPort(port); + supervisor.getAvailableWorkerPorts().remove(port); + worker.setNodeId(supervisor.getSupervisorId()); + } + + private void putWorkerToSupervisor( + List<ResourceWorkerSlot> assignedWorkers, + List<SupervisorInfo> supervisors) { + int allUsedPorts = 0; + for (SupervisorInfo supervisor : supervisors) { + int supervisorUsedPorts = supervisor.getWorkerPorts().size() + - supervisor.getAvailableWorkerPorts().size(); + allUsedPorts = allUsedPorts + supervisorUsedPorts; + } + // per supervisor should be allocated ports in theory + int theoryAveragePorts = (allUsedPorts + assignedWorkers.size()) + / supervisors.size() + 1; + // supervisor which use more than theoryAveragePorts ports will be + // pushed overLoadSupervisors + List<SupervisorInfo> overLoadSupervisors = new ArrayList<SupervisorInfo>(); + int key = 0; + Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator(); + while (iterator.hasNext()) { + if (supervisors.size() == 0) + break; + if (key >= supervisors.size()) + key = 0; + SupervisorInfo supervisor = supervisors.get(key); + int supervisorUsedPorts = supervisor.getWorkerPorts().size() + - supervisor.getAvailableWorkerPorts().size(); + if (supervisorUsedPorts < theoryAveragePorts) { + ResourceWorkerSlot worker = iterator.next(); + if (worker.getNodeId() != null) + continue; + worker.setHostname(supervisor.getHostName()); + worker.setNodeId(supervisor.getSupervisorId()); + worker.setPort(supervisor.getAvailableWorkerPorts().iterator() + .next()); + supervisor.getAvailableWorkerPorts().remove(worker.getPort()); + if (supervisor.getAvailableWorkerPorts().size() == 0) + supervisors.remove(supervisor); + key++; + } else { + overLoadSupervisors.add(supervisor); + supervisors.remove(supervisor); + } + } + // rest assignedWorkers will be allocate supervisor by deal + Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() { + + @Override + public int compare(SupervisorInfo o1, SupervisorInfo o2) { + // TODO Auto-generated method stub + return -NumberUtils.compare( + o1.getAvailableWorkerPorts().size(), o2 + .getAvailableWorkerPorts().size()); + } + + }); + key = 0; + while (iterator.hasNext()) { + if (overLoadSupervisors.size() == 0) + break; + if (key >= overLoadSupervisors.size()) + key = 0; + ResourceWorkerSlot worker = iterator.next(); + if (worker.getNodeId() != null) + continue; + SupervisorInfo supervisor = overLoadSupervisors.get(key); + worker.setHostname(supervisor.getHostName()); + worker.setNodeId(supervisor.getSupervisorId()); + worker.setPort(supervisor.getAvailableWorkerPorts().iterator() + .next()); + supervisor.getAvailableWorkerPorts().remove(worker.getPort()); + if (supervisor.getAvailableWorkerPorts().size() == 0) + overLoadSupervisors.remove(supervisor); + key++; + } + } + + private void getRightWorkers(DefaultTopologyAssignContext context, + Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers, + int workersNum, Collection<ResourceWorkerSlot> workers) { + Set<Integer> assigned = new HashSet<Integer>(); + List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>(); + if (workers == null) + return; + for (ResourceWorkerSlot worker : workers) { + boolean right = true; + Set<Integer> tasks = worker.getTasks(); + if (tasks == null) + continue; + for (Integer task : tasks) { + if (!needAssign.contains(task) || assigned.contains(task)) { + right = false; + break; + } + } + if (right) { + assigned.addAll(tasks); + users.add(worker); + } + } + if (users.size() + assignedWorkers.size() > workersNum) { + LOG.warn( + "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}", + users, assignedWorkers, workersNum); + return; + } + + assignedWorkers.addAll(users); + needAssign.removeAll(assigned); + } + + private int getAvailableWorkersNum(DefaultTopologyAssignContext context) { + Map<String, SupervisorInfo> supervisors = context.getCluster(); + List<SupervisorInfo> isolationSupervisors = this + .getIsolationSupervisors(context); + int slotNum = 0; + + if (isolationSupervisors.size() != 0) { + for (SupervisorInfo superivsor : isolationSupervisors) { + slotNum = slotNum + superivsor.getAvailableWorkerPorts().size(); + } + } else { + for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) { + slotNum = slotNum + + entry.getValue().getAvailableWorkerPorts().size(); + } + } + return slotNum; + } + + /** + * @param context + * @param workers + * @return + */ + private List<ResourceWorkerSlot> getUserDefineWorkers( + DefaultTopologyAssignContext context, List<WorkerAssignment> workers) { + List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>(); + if (workers == null) + return ret; + Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context + .getComponentTasks()).clone(); + if (context.getAssignType() != context.ASSIGN_TYPE_NEW) { + checkUserDefineWorkers(context, workers, + context.getTaskToComponent()); + } + for (WorkerAssignment worker : workers) { + ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker, + componentToTask); + if (workerSlot.getTasks().size() != 0) { + ret.add(workerSlot); + } + } + return ret; + } + + private void checkUserDefineWorkers(DefaultTopologyAssignContext context, + List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) { + Set<ResourceWorkerSlot> unstoppedWorkers = context + .getUnstoppedWorkers(); + List<WorkerAssignment> re = new ArrayList<WorkerAssignment>(); + for (WorkerAssignment worker : workers) { + for (ResourceWorkerSlot unstopped : unstoppedWorkers) { + if (unstopped + .compareToUserDefineWorker(worker, taskToComponent)) + re.add(worker); + } + } + workers.removeAll(re); + + } + + private List<SupervisorInfo> getResAvailSupervisors( + Map<String, SupervisorInfo> supervisors) { + List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>(); + for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) { + SupervisorInfo supervisor = entry.getValue(); + if (supervisor.getAvailableWorkerPorts().size() > 0) + availableSupervisors.add(entry.getValue()); + } + return availableSupervisors; + } + + private List<SupervisorInfo> getResAvailSupervisors( + List<SupervisorInfo> supervisors) { + List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>(); + for (SupervisorInfo supervisor : supervisors) { + if (supervisor.getAvailableWorkerPorts().size() > 0) + availableSupervisors.add(supervisor); + } + return availableSupervisors; + } + + private List<SupervisorInfo> getIsolationSupervisors( + DefaultTopologyAssignContext context) { + List<String> isolationHosts = (List<String>) context.getStormConf() + .get(Config.ISOLATION_SCHEDULER_MACHINES); + LOG.info("Isolation machines: " + isolationHosts); + if (isolationHosts == null) + return new ArrayList<SupervisorInfo>(); + List<SupervisorInfo> isolationSupervisors = new ArrayList<SupervisorInfo>(); + for (Entry<String, SupervisorInfo> entry : context.getCluster() + .entrySet()) { + if (containTargetHost(isolationHosts, entry.getValue() + .getHostName())) { + isolationSupervisors.add(entry.getValue()); + } + } + return isolationSupervisors; + } + + private boolean containTargetHost(Collection<String> hosts, String target) { + for (String host : hosts) { + if (NetWorkUtils.equals(host, target) == true) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java index 6481c5e..b0fdc92 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java @@ -17,31 +17,20 @@ */ package com.alibaba.jstorm.task; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; import backtype.storm.messaging.IContext; import backtype.storm.serialization.KryoTupleSerializer; import backtype.storm.spout.ISpout; import backtype.storm.task.IBolt; import backtype.storm.task.TopologyContext; import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.Utils; import backtype.storm.utils.WorkerClassLoader; import clojure.lang.Atom; - import com.alibaba.jstorm.callback.AsyncLoopDefaultKill; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.cluster.StormZkClusterState; import com.alibaba.jstorm.daemon.worker.WorkerData; import com.alibaba.jstorm.schedule.Assignment.AssignmentType; import com.alibaba.jstorm.task.comm.TaskSendTargets; @@ -55,29 +44,27 @@ import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors; import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors; import com.alibaba.jstorm.task.execute.spout.SpoutExecutors; import com.alibaba.jstorm.task.group.MkGrouper; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable; -import com.alibaba.jstorm.task.heartbeat.TaskStats; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.NetWorkUtils; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Task instance * * @author yannian/Longda - * */ -public class Task { - +public class Task implements Runnable{ private final static Logger LOG = LoggerFactory.getLogger(Task.class); private Map<Object, Object> stormConf; private TopologyContext topologyContext; private TopologyContext userContext; - private String topologyid; private IContext context; private TaskTransfer taskTransfer; @@ -86,8 +73,9 @@ public class Task { private Map<Integer, DisruptorQueue> deserializeQueues; private AsyncLoopDefaultKill workHalt; - private Integer taskid; - private String componentid; + private String topologyId; + private Integer taskId; + private String componentId; private volatile TaskStatus taskStatus; private Atom openOrPrepareWasCalled; // running time counter @@ -97,63 +85,46 @@ public class Task { private Object taskObj; private TaskBaseMetric taskStats; private WorkerData workerData; - private String componentType; // "spout" or "bolt" private TaskSendTargets taskSendTargets; + private TaskReportErrorAndDie reportErrorDie; private boolean isTaskBatchTuple; + private TaskShutdownDameon taskShutdownDameon; @SuppressWarnings("rawtypes") public Task(WorkerData workerData, int taskId) throws Exception { openOrPrepareWasCalled = new Atom(Boolean.valueOf(false)); this.workerData = workerData; - this.topologyContext = - workerData.getContextMaker().makeTopologyContext( - workerData.getSysTopology(), taskId, - openOrPrepareWasCalled); - this.userContext = - workerData.getContextMaker().makeTopologyContext( - workerData.getRawTopology(), taskId, - openOrPrepareWasCalled); - this.taskid = taskId; - this.componentid = topologyContext.getThisComponentId(); - this.stormConf = - Common.component_conf(workerData.getStormConf(), - topologyContext, componentid); + this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), taskId, openOrPrepareWasCalled); + this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), taskId, openOrPrepareWasCalled); + this.taskId = taskId; + this.componentId = topologyContext.getThisComponentId(); + this.stormConf = Common.component_conf(workerData.getStormConf(), topologyContext, componentId); this.taskStatus = new TaskStatus(); - this.taskStats = new TaskBaseMetric(taskId); this.innerTaskTransfer = workerData.getInnerTaskTransfer(); this.deserializeQueues = workerData.getDeserializeQueues(); - this.topologyid = workerData.getTopologyId(); + this.topologyId = workerData.getTopologyId(); this.context = workerData.getContext(); this.workHalt = workerData.getWorkHalt(); - this.zkCluster = - new StormZkClusterState(workerData.getZkClusterstate()); + this.zkCluster =workerData.getZkCluster(); + this.taskStats = new TaskBaseMetric(topologyId, componentId, taskId, + ConfigExtension.isEnableMetrics(workerData.getStormConf())); - LOG.info("Begin to deserialize taskObj " + componentid + ":" + taskid); + LOG.info("Begin to deserialize taskObj " + componentId + ":" + this.taskId); WorkerClassLoader.switchThreadContext(); // get real task object -- spout/bolt/spoutspec - this.taskObj = - Common.get_task_object(topologyContext.getRawTopology(), - componentid, WorkerClassLoader.getInstance()); + this.taskObj = Common.get_task_object(topologyContext.getRawTopology(), componentId, WorkerClassLoader.getInstance()); WorkerClassLoader.restoreThreadContext(); isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(stormConf); LOG.info("Transfer/receive in batch mode :" + isTaskBatchTuple); - LOG.info("Loading task " + componentid + ":" + taskid); - } - - private void setComponentType() { - if (taskObj instanceof IBolt) { - componentType = "bolt"; - } else if (taskObj instanceof ISpout) { - componentType = "spout"; - } + LOG.info("Loading task " + componentId + ":" + this.taskId); } private TaskSendTargets makeSendTargets() { @@ -161,44 +132,20 @@ public class Task { // get current task's output // <Stream_id,<component, Grouping>> - Map<String, Map<String, MkGrouper>> streamComponentGrouper = - Common.outbound_components(topologyContext, workerData); + Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData); - return new TaskSendTargets(stormConf, component, - streamComponentGrouper, topologyContext, taskStats); + return new TaskSendTargets(stormConf, component, streamComponentGrouper, topologyContext, taskStats); } private void updateSendTargets() { if (taskSendTargets != null) { - Map<String, Map<String, MkGrouper>> streamComponentGrouper = - Common.outbound_components(topologyContext, workerData); + Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData); taskSendTargets.updateStreamCompGrouper(streamComponentGrouper); } else { LOG.error("taskSendTargets is null when trying to update it."); } } - private TaskTransfer mkTaskSending(WorkerData workerData) { - - // sending tuple's serializer - KryoTupleSerializer serializer = - new KryoTupleSerializer(workerData.getStormConf(), - topologyContext); - - String taskName = JStormServerUtils.getName(componentid, taskid); - // Task sending all tuples through this Object - TaskTransfer taskTransfer; - if (isTaskBatchTuple) - taskTransfer = - new TaskBatchTransfer(this, taskName, serializer, - taskStatus, workerData); - else - taskTransfer = - new TaskTransfer(this, taskName, serializer, taskStatus, - workerData); - return taskTransfer; - } - public TaskSendTargets echoToSystemBolt() { // send "startup" tuple to system bolt List<Object> msg = new ArrayList<Object>(); @@ -206,9 +153,7 @@ public class Task { // create task receive object TaskSendTargets sendTargets = makeSendTargets(); - UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, - Common.SYSTEM_STREAM_ID, msg); - + UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Common.SYSTEM_STREAM_ID, msg); return sendTargets; } @@ -217,102 +162,90 @@ public class Task { if (isOnePending == true) { return true; } - return ConfigExtension.isSpoutSingleThread(conf); } - public RunnableCallback mk_executors(TaskSendTargets sendTargets, - ITaskReportErr report_error) { + public BaseExecutors mkExecutor() { + BaseExecutors baseExecutor = null; if (taskObj instanceof IBolt) { - return new BoltExecutors(this, (IBolt) taskObj, taskTransfer, - innerTaskTransfer, stormConf, sendTargets, taskStatus, - topologyContext, userContext, taskStats, report_error); + baseExecutor = new BoltExecutors(this); } else if (taskObj instanceof ISpout) { if (isSingleThread(stormConf) == true) { - return new SingleThreadSpoutExecutors(this, (ISpout) taskObj, - taskTransfer, innerTaskTransfer, stormConf, - sendTargets, taskStatus, topologyContext, userContext, - taskStats, report_error); + baseExecutor = new SingleThreadSpoutExecutors(this); } else { - return new MultipleThreadSpoutExecutors(this, (ISpout) taskObj, - taskTransfer, innerTaskTransfer, stormConf, - sendTargets, taskStatus, topologyContext, userContext, - taskStats, report_error); + baseExecutor = new MultipleThreadSpoutExecutors(this); } } - - return null; + + return baseExecutor; } /** * create executor to receive tuples and run bolt/spout execute function - * - * @param puller - * @param sendTargets - * @return */ - private RunnableCallback mkExecutor(TaskSendTargets sendTargets) { + private RunnableCallback prepareExecutor() { // create report error callback, // in fact it is storm_cluster.report-task-error - ITaskReportErr reportError = - new TaskReportError(zkCluster, topologyid, taskid); + ITaskReportErr reportError = new TaskReportError(zkCluster, topologyId, taskId); // report error and halt worker - TaskReportErrorAndDie reportErrorDie = - new TaskReportErrorAndDie(reportError, workHalt); + reportErrorDie = new TaskReportErrorAndDie(reportError, workHalt); + + final BaseExecutors baseExecutor = mkExecutor(); - return mk_executors(sendTargets, reportErrorDie); + return baseExecutor; } public TaskReceiver mkTaskReceiver() { - String taskName = JStormServerUtils.getName(componentid, taskid); - TaskReceiver taskReceiver; + String taskName = JStormServerUtils.getName(componentId, taskId); if (isTaskBatchTuple) - taskReceiver = - new TaskBatchReceiver(this, taskid, stormConf, - topologyContext, innerTaskTransfer, taskStatus, - taskName); + taskReceiver = new TaskBatchReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName); else - taskReceiver = - new TaskReceiver(this, taskid, stormConf, topologyContext, - innerTaskTransfer, taskStatus, taskName); - deserializeQueues.put(taskid, taskReceiver.getDeserializeQueue()); + taskReceiver = new TaskReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName); + deserializeQueues.put(taskId, taskReceiver.getDeserializeQueue()); return taskReceiver; } public TaskShutdownDameon execute() throws Exception { - setComponentType(); taskSendTargets = echoToSystemBolt(); // create thread to get tuple from zeroMQ, // and pass the tuple to bolt/spout taskTransfer = mkTaskSending(workerData); - RunnableCallback baseExecutor = mkExecutor(taskSendTargets); - AsyncLoopThread executor_threads = - new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, - true); + RunnableCallback baseExecutor = prepareExecutor(); + AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true); taskReceiver = mkTaskReceiver(); List<AsyncLoopThread> allThreads = new ArrayList<AsyncLoopThread>(); allThreads.add(executor_threads); - TaskHeartbeatRunable.registerTaskStats(taskid, new TaskStats( - componentType, taskStats)); - LOG.info("Finished loading task " + componentid + ":" + taskid); + LOG.info("Finished loading task " + componentId + ":" + taskId); - return getShutdown(allThreads, taskReceiver.getDeserializeQueue(), + taskShutdownDameon = getShutdown(allThreads, taskReceiver.getDeserializeQueue(), baseExecutor); + return taskShutdownDameon; } - public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, - DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) { + private TaskTransfer mkTaskSending(WorkerData workerData) { + // sending tuple's serializer + KryoTupleSerializer serializer = new KryoTupleSerializer(workerData.getStormConf(), topologyContext); + String taskName = JStormServerUtils.getName(componentId, taskId); + // Task sending all tuples through this Object + TaskTransfer taskTransfer; + if (isTaskBatchTuple) + taskTransfer = new TaskBatchTransfer(this, taskName, serializer, taskStatus, workerData); + else + taskTransfer = new TaskTransfer(this, taskName, serializer, taskStatus, workerData); + return taskTransfer; + } + + public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) { AsyncLoopThread ackerThread = null; if (baseExecutor instanceof SpoutExecutors) { - ackerThread = - ((SpoutExecutors) baseExecutor).getAckerRunnableThread(); + ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread(); if (ackerThread != null) { allThreads.add(ackerThread); @@ -324,24 +257,30 @@ public class Task { AsyncLoopThread serializeThread = taskTransfer.getSerializeThread(); allThreads.add(serializeThread); - TaskShutdownDameon shutdown = - new TaskShutdownDameon(taskStatus, topologyid, taskid, - allThreads, zkCluster, taskObj); + TaskShutdownDameon shutdown = new TaskShutdownDameon(taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this); return shutdown; } - public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) - throws Exception { + public TaskShutdownDameon getTaskShutdownDameon(){ + return taskShutdownDameon; + } - Task t = new Task(workerData, taskId); + public void run(){ + try { + taskShutdownDameon=this.execute(); + }catch (Throwable e){ + LOG.error("init task take error", e); + } + } + public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) throws Exception { + Task t = new Task(workerData, taskId); return t.execute(); } /** - * Update the data which can be changed dynamically e.g. when scale-out of a - * task parallelism + * Update the data which can be changed dynamically e.g. when scale-out of a task parallelism */ public void updateTaskData() { // Only update the local task list of topologyContext here. Because @@ -359,12 +298,94 @@ public class Task { public long getWorkerAssignmentTs() { return workerData.getAssignmentTs(); } - + public AssignmentType getWorkerAssignmentType() { return workerData.getAssignmentType(); } public void unregisterDeserializeQueue() { - deserializeQueues.remove(taskid); + deserializeQueues.remove(taskId); + } + + public String getComponentId() { + return componentId; + } + + public Integer getTaskId() { + return taskId; + } + + public DisruptorQueue getExecuteQueue() { + return innerTaskTransfer.get(taskId); + } + + public DisruptorQueue getDeserializeQueue() { + return deserializeQueues.get(taskId); } + + public Map<Object, Object> getStormConf() { + return stormConf; + } + + public TopologyContext getTopologyContext() { + return topologyContext; + } + + public TopologyContext getUserContext() { + return userContext; + } + + public TaskTransfer getTaskTransfer() { + return taskTransfer; + } + + public TaskReceiver getTaskReceiver() { + return taskReceiver; + } + + public Map<Integer, DisruptorQueue> getInnerTaskTransfer() { + return innerTaskTransfer; + } + + public Map<Integer, DisruptorQueue> getDeserializeQueues() { + return deserializeQueues; + } + + public String getTopologyId() { + return topologyId; + } + + public TaskStatus getTaskStatus() { + return taskStatus; + } + + public StormClusterState getZkCluster() { + return zkCluster; + } + + public Object getTaskObj() { + return taskObj; + } + + public TaskBaseMetric getTaskStats() { + return taskStats; + } + + public WorkerData getWorkerData() { + return workerData; + } + + public TaskSendTargets getTaskSendTargets() { + return taskSendTargets; + } + + public TaskReportErrorAndDie getReportErrorDie() { + return reportErrorDie; + } + + public boolean isTaskBatchTuple() { + return isTaskBatchTuple; + } + + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java index 4c9eb0b..84c6151 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java @@ -17,116 +17,110 @@ */ package com.alibaba.jstorm.task; -import java.io.Serializable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.common.metric.MetricRegistry; -import com.alibaba.jstorm.common.metric.window.Metric; +import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class TaskBaseMetric implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(TaskBaseMetric.class); + private static final Logger logger = LoggerFactory.getLogger(JStormMetrics.class); + private static final long serialVersionUID = -7157987126460293444L; - protected MetricRegistry metrics; + private String topologyId; + private String componentId; private int taskId; + private boolean enableMetrics; - public TaskBaseMetric(int taskId) { - metrics = JStormMetrics.registerTask(taskId); + /** + * local metric name cache to avoid frequent metric name concatenation streamId + name ==> full metric name + */ + private static final ConcurrentMap<String, AsmMetric> metricCache = new ConcurrentHashMap<String, AsmMetric>(); + + public TaskBaseMetric(String topologyId, String componentId, int taskId, boolean enableMetrics) { + this.topologyId = topologyId; + this.componentId = componentId; this.taskId = taskId; + this.enableMetrics = enableMetrics; + logger.info("init task base metric, tp id:{}, comp id:{}, task id:{}", topologyId, componentId, taskId); } - public void update(String name, Number value, int type) { - Metric metric = metrics.getMetric(name); - if (metric == null) { - metric = JStormMetrics.Builder.mkInstance(type); - try { - /** - * Here use one hack method to handle competition register metric - * if duplicated metric, just skip it. - * - * this will improve performance - */ - JStormMetrics.registerTaskMetric(metric, taskId, name); - }catch(Exception e) { - LOG.warn("Duplicated metrics of {}, taskId:{}", name, taskId); - return ; + public void update(final String streamId, final String name, final Number value, final MetricType metricType, + boolean mergeTopology) { + String key = taskId + streamId + name; + AsmMetric existingMetric = metricCache.get(key); + if (existingMetric == null) { + String fullName = MetricUtils.streamMetricName(topologyId, componentId, taskId, streamId, name, metricType); + existingMetric = JStormMetrics.getStreamMetric(fullName); + if (existingMetric == null) { + existingMetric = AsmMetric.Builder.build(metricType); + JStormMetrics.registerStreamMetric(fullName, existingMetric, mergeTopology); } - + metricCache.putIfAbsent(key, existingMetric); } - metric.update(value); + + existingMetric.update(value); + } + + public void update(final String streamId, final String name, final Number value, final MetricType metricType) { + update(streamId, name, value, metricType, true); } public void send_tuple(String stream, int num_out_tasks) { - if (num_out_tasks <= 0) { - return; + if (enableMetrics && num_out_tasks > 0) { + update(stream, MetricDef.EMMITTED_NUM, num_out_tasks, MetricType.COUNTER); + update(stream, MetricDef.SEND_TPS, num_out_tasks, MetricType.METER); } - - String emmitedName = - MetricRegistry.name(MetricDef.EMMITTED_NUM, stream); - update(emmitedName, Double.valueOf(num_out_tasks), - JStormMetrics.Builder.COUNTER); - - String sendTpsName = MetricRegistry.name(MetricDef.SEND_TPS, stream); - update(sendTpsName, Double.valueOf(num_out_tasks), - JStormMetrics.Builder.METER); } public void recv_tuple(String component, String stream) { - - String name = - MetricRegistry.name(MetricDef.RECV_TPS, component, stream); - update(name, Double.valueOf(1), JStormMetrics.Builder.METER); - + if (enableMetrics) { + update(stream, AsmMetric.mkName(component, MetricDef.RECV_TPS), 1, MetricType.METER); +// update(stream, MetricDef.RECV_TPS, 1, MetricType.METER); + } } - public void bolt_acked_tuple(String component, String stream, - Double latency_ms) { + public void bolt_acked_tuple(String component, String stream, Long latency, Long lifeCycle) { + if (enableMetrics) { +// update(stream, AsmMetric.mkName(component, MetricDef.ACKED_NUM), 1, MetricType.COUNTER); +// update(stream, AsmMetric.mkName(component, MetricDef.PROCESS_LATENCY), latency_ms, MetricType.HISTOGRAM); + update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER); + update(stream, MetricDef.PROCESS_LATENCY, latency, MetricType.HISTOGRAM, false); - if (latency_ms == null) { - return; + if (lifeCycle > 0) { + update(stream, AsmMetric.mkName(component, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false); + } } - - String ackNumName = - MetricRegistry.name(MetricDef.ACKED_NUM, component, stream); - update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER); - - String processName = - MetricRegistry.name(MetricDef.PROCESS_LATENCY, component, - stream); - update(processName, latency_ms, - JStormMetrics.Builder.HISTOGRAM); } public void bolt_failed_tuple(String component, String stream) { - - String failNumName = - MetricRegistry.name(MetricDef.FAILED_NUM, component, stream); - update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER); + if (enableMetrics) { + //update(stream, AsmMetric.mkName(component, MetricDef.FAILED_NUM), 1, MetricType.COUNTER); + update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER); + } } - public void spout_acked_tuple(String stream, long st) { - - String ackNumName = - MetricRegistry.name(MetricDef.ACKED_NUM, - Common.ACKER_COMPONENT_ID, stream); - update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER); - - String processName = - MetricRegistry.name(MetricDef.PROCESS_LATENCY, - Common.ACKER_COMPONENT_ID, stream); - update(processName, Double.valueOf(st), JStormMetrics.Builder.HISTOGRAM); + public void spout_acked_tuple(String stream, long st, Long lifeCycle) { + if (enableMetrics) { + update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER); + update(stream, MetricDef.PROCESS_LATENCY, st, MetricType.HISTOGRAM, true); + if (lifeCycle > 0) { + update(stream, AsmMetric.mkName(Common.ACKER_COMPONENT_ID, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false); + } + } } public void spout_failed_tuple(String stream) { - String failNumName = - MetricRegistry.name(MetricDef.FAILED_NUM, - Common.ACKER_COMPONENT_ID, stream); - update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER); - + if (enableMetrics) { + update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER); + } } }
