http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java index e984455..fab05ea 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java @@ -17,51 +17,36 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.jstorm.callback.Callback; -import com.alibaba.jstorm.callback.impl.ActiveTransitionCallback; -import com.alibaba.jstorm.callback.impl.DoRebalanceTransitionCallback; -import com.alibaba.jstorm.callback.impl.DoneRebalanceTransitionCallback; -import com.alibaba.jstorm.callback.impl.InactiveTransitionCallback; -import com.alibaba.jstorm.callback.impl.KillTransitionCallback; -import com.alibaba.jstorm.callback.impl.ReassignTransitionCallback; -import com.alibaba.jstorm.callback.impl.RebalanceTransitionCallback; -import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback; -import com.alibaba.jstorm.callback.impl.UpdateConfTransitionCallback; +import com.alibaba.jstorm.callback.impl.*; import com.alibaba.jstorm.cluster.StormBase; import com.alibaba.jstorm.cluster.StormStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Status changing * * @author version1: lixin version2: Longda - * - * - * */ public class StatusTransition { - private final static Logger LOG = LoggerFactory - .getLogger(StatusTransition.class); + private final static Logger LOG = LoggerFactory.getLogger(StatusTransition.class); private NimbusData data; - private Map<String, Object> topologyLocks = - new ConcurrentHashMap<String, Object>(); + private Map<String, Object> topologyLocks = new ConcurrentHashMap<String, Object>(); public StatusTransition(NimbusData data) { this.data = data; } - public <T> void transition(String topologyid, boolean errorOnNoTransition, - StatusType changeStatus, T... args) throws Exception { + public <T> void transition(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception { // lock outside Object lock = topologyLocks.get(topologyid); if (lock == null) { @@ -70,8 +55,7 @@ public class StatusTransition { } if (data.getIsShutdown().get() == true) { - LOG.info("Nimbus is in shutdown, skip this event " + topologyid - + ":" + changeStatus); + LOG.info("Nimbus is in shutdown, skip this event " + topologyid + ":" + changeStatus); return; } @@ -86,50 +70,34 @@ public class StatusTransition { /** * Changing status * - * @param topologyId - * @param errorOnNTransition if it is true, failure will throw exception * @param args -- will be used in the status changing callback - * */ - public <T> void transitionLock(String topologyid, - boolean errorOnNoTransition, StatusType changeStatus, T... args) - throws Exception { + public <T> void transitionLock(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception { // get ZK's topology node's data, which is StormBase - StormBase stormbase = - data.getStormClusterState().storm_base(topologyid, null); + StormBase stormbase = data.getStormClusterState().storm_base(topologyid, null); if (stormbase == null) { - LOG.error("Cannot apply event changing status " - + changeStatus.getStatus() + " to " + topologyid - + " because failed to get StormBase from ZK"); + LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because failed to get StormBase from ZK"); return; } StormStatus currentStatus = stormbase.getStatus(); if (currentStatus == null) { - LOG.error("Cannot apply event changing status " - + changeStatus.getStatus() + " to " + topologyid - + " because topologyStatus is null in ZK"); + LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because topologyStatus is null in ZK"); return; } // <currentStatus, Map<changingStatus, callback>> - Map<StatusType, Map<StatusType, Callback>> callbackMap = - stateTransitions(topologyid, currentStatus); + Map<StatusType, Map<StatusType, Callback>> callbackMap = stateTransitions(topologyid, currentStatus); // get current changingCallbacks - Map<StatusType, Callback> changingCallbacks = - callbackMap.get(currentStatus.getStatusType()); + Map<StatusType, Callback> changingCallbacks = callbackMap.get(currentStatus.getStatusType()); - if (changingCallbacks == null - || changingCallbacks.containsKey(changeStatus) == false - || changingCallbacks.get(changeStatus) == null) { + if (changingCallbacks == null || changingCallbacks.containsKey(changeStatus) == false || changingCallbacks.get(changeStatus) == null) { String msg = - "No transition for event: changing status:" - + changeStatus.getStatus() + ", current status: " - + currentStatus.getStatusType() + " topology-id: " - + topologyid; + "No transition for event: changing status:" + changeStatus.getStatus() + ", current status: " + currentStatus.getStatusType() + + " topology-id: " + topologyid; LOG.info(msg); if (errorOnNoTransition) { throw new RuntimeException(msg); @@ -144,12 +112,10 @@ public class StatusTransition { StormStatus newStatus = (StormStatus) obj; // update status to ZK data.getStormClusterState().update_storm(topologyid, newStatus); - LOG.info("Successfully updated " + topologyid + " as status " - + newStatus); + LOG.info("Successfully updated " + topologyid + " as status " + newStatus); } - LOG.info("Successfully apply event changing status " - + changeStatus.getStatus() + " to " + topologyid); + LOG.info("Successfully apply event changing status " + changeStatus.getStatus() + " to " + topologyid); return; } @@ -157,104 +123,74 @@ public class StatusTransition { /** * generate status changing map * - * - * * @param topologyid - * @param status - * @return - * - * Map<StatusType, Map<StatusType, Callback>> means - * Map<currentStatus, Map<changingStatus, Callback>> + * @return Map<StatusType, Map<StatusType, Callback>> means Map<currentStatus, Map<changingStatus, Callback>> */ - private Map<StatusType, Map<StatusType, Callback>> stateTransitions( - String topologyid, StormStatus currentStatus) { + private Map<StatusType, Map<StatusType, Callback>> stateTransitions(String topologyid, StormStatus currentStatus) { /** * - * 1. Status: this status will be stored in ZK - * killed/inactive/active/rebalancing 2. action: + * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action: * - * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger - * this only valid when current status is active inactivate -- client - * will trigger this action, only valid when current status is active - * activate -- client will trigger this action only valid when current - * status is inactive startup -- when nimbus startup, it will trigger - * this action only valid when current status is killed/rebalancing kill - * -- client kill topology will trigger this action, only valid when - * current status is active/inactive/killed remove -- 30 seconds after - * client submit kill command, it will do this action, only valid when - * current status is killed rebalance -- client submit rebalance - * command, only valid when current status is active/deactive - * do_rebalance -- 30 seconds after client submit rebalance command, it - * will do this action, only valid when current status is rebalance + * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger + * this action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup + * -- when nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger + * this action, only valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action, + * only valid when current status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance + * -- 30 seconds after client submit rebalance command, it will do this action, only valid when current status is rebalance */ - Map<StatusType, Map<StatusType, Callback>> rtn = - new HashMap<StatusType, Map<StatusType, Callback>>(); + Map<StatusType, Map<StatusType, Callback>> rtn = new HashMap<StatusType, Map<StatusType, Callback>>(); // current status is active - Map<StatusType, Callback> activeMap = - new HashMap<StatusType, Callback>(); - activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data, - topologyid)); + Map<StatusType, Callback> activeMap = new HashMap<StatusType, Callback>(); + activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid)); activeMap.put(StatusType.inactivate, new InactiveTransitionCallback()); activeMap.put(StatusType.startup, null); activeMap.put(StatusType.activate, null); - activeMap.put(StatusType.kill, new KillTransitionCallback(data, - topologyid)); + activeMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid)); activeMap.put(StatusType.remove, null); - activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback( - data, topologyid, currentStatus)); + activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus)); activeMap.put(StatusType.do_rebalance, null); activeMap.put(StatusType.done_rebalance, null); - activeMap.put(StatusType.update_conf, new UpdateConfTransitionCallback( - data, topologyid, currentStatus)); + activeMap.put(StatusType.update_topology, new UpdateTopologyTransitionCallback(data, topologyid, currentStatus)); rtn.put(StatusType.active, activeMap); // current status is inactive - Map<StatusType, Callback> inactiveMap = - new HashMap<StatusType, Callback>(); + Map<StatusType, Callback> inactiveMap = new HashMap<StatusType, Callback>(); - inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback( - data, topologyid, new StormStatus(StatusType.inactive))); + inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid, new StormStatus(StatusType.inactive))); inactiveMap.put(StatusType.inactivate, null); inactiveMap.put(StatusType.startup, null); inactiveMap.put(StatusType.activate, new ActiveTransitionCallback()); - inactiveMap.put(StatusType.kill, new KillTransitionCallback(data, - topologyid)); + inactiveMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid)); inactiveMap.put(StatusType.remove, null); - inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback( - data, topologyid, currentStatus)); + inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus)); inactiveMap.put(StatusType.do_rebalance, null); inactiveMap.put(StatusType.done_rebalance, null); - inactiveMap.put(StatusType.update_conf, null); + inactiveMap.put(StatusType.update_topology, null); rtn.put(StatusType.inactive, inactiveMap); // current status is killed - Map<StatusType, Callback> killedMap = - new HashMap<StatusType, Callback>(); + Map<StatusType, Callback> killedMap = new HashMap<StatusType, Callback>(); killedMap.put(StatusType.monitor, null); killedMap.put(StatusType.inactivate, null); - killedMap.put(StatusType.startup, new KillTransitionCallback(data, - topologyid)); + killedMap.put(StatusType.startup, new KillTransitionCallback(data, topologyid)); killedMap.put(StatusType.activate, null); - killedMap.put(StatusType.kill, new KillTransitionCallback(data, - topologyid)); - killedMap.put(StatusType.remove, new RemoveTransitionCallback(data, - topologyid)); + killedMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid)); + killedMap.put(StatusType.remove, new RemoveTransitionCallback(data, topologyid)); killedMap.put(StatusType.rebalance, null); killedMap.put(StatusType.do_rebalance, null); killedMap.put(StatusType.done_rebalance, null); - killedMap.put(StatusType.update_conf, null); + killedMap.put(StatusType.update_topology, null); rtn.put(StatusType.killed, killedMap); // current status is under rebalancing - Map<StatusType, Callback> rebalancingMap = - new HashMap<StatusType, Callback>(); + Map<StatusType, Callback> rebalancingMap = new HashMap<StatusType, Callback>(); StatusType rebalanceOldStatus = StatusType.active; if (currentStatus.getOldStatus() != null) { @@ -267,20 +203,14 @@ public class StatusTransition { rebalancingMap.put(StatusType.monitor, null); rebalancingMap.put(StatusType.inactivate, null); - rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback( - data, topologyid, new StormStatus(rebalanceOldStatus))); + rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus))); rebalancingMap.put(StatusType.activate, null); rebalancingMap.put(StatusType.kill, null); rebalancingMap.put(StatusType.remove, null); - rebalancingMap - .put(StatusType.rebalance, new RebalanceTransitionCallback( - data, topologyid, currentStatus)); - rebalancingMap.put(StatusType.do_rebalance, - new DoRebalanceTransitionCallback(data, topologyid, - new StormStatus(rebalanceOldStatus))); - rebalancingMap.put(StatusType.done_rebalance, - new DoneRebalanceTransitionCallback(data, topologyid)); - rebalancingMap.put(StatusType.update_conf, null); + rebalancingMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus)); + rebalancingMap.put(StatusType.do_rebalance, new DoRebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus))); + rebalancingMap.put(StatusType.done_rebalance, new DoneRebalanceTransitionCallback(data, topologyid)); + rebalancingMap.put(StatusType.update_topology, null); rtn.put(StatusType.rebalancing, rebalancingMap); /** @@ -288,7 +218,6 @@ public class StatusTransition { */ return rtn; - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java index cf785b7..d0f68ff 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java @@ -20,21 +20,14 @@ package com.alibaba.jstorm.daemon.nimbus; /** * topology status: * - * 1. Status: this status will be stored in ZK - * killed/inactive/active/rebalancing 2. action: + * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action: * - * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this - * only valid when current status is active inactivate -- client will trigger - * this action, only valid when current status is active activate -- client will - * trigger this action only valid when current status is inactive startup -- - * when nimbus startup, it will trigger this action only valid when current - * status is killed/rebalancing kill -- client kill topology will trigger this - * action, only valid when current status is active/inactive/killed remove -- 30 - * seconds after client submit kill command, it will do this action, only valid - * when current status is killed rebalance -- client submit rebalance command, - * only valid when current status is active/deactive do_rebalance -- 30 seconds - * after client submit rebalance command, it will do this action, only valid - * when current status is rebalance + * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger this + * action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup -- when + * nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger this action, only + * valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action, only valid when current + * status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance -- 30 seconds after client + * submit rebalance command, it will do this action, only valid when current status is rebalance * * * @@ -43,13 +36,11 @@ package com.alibaba.jstorm.daemon.nimbus; public enum StatusType { // status - active("active"), inactive("inactive"), rebalancing("rebalancing"), killed( - "killed"), + active("active"), inactive("inactive"), rebalancing("rebalancing"), killed("killed"), // actions - activate("activate"), inactivate("inactivate"), monitor("monitor"), startup( - "startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance( - "do-rebalance"), done_rebalance("done-rebalance"), update_conf("update-config"); + activate("activate"), inactivate("inactivate"), monitor("monitor"), startup("startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance( + "do-rebalance"), done_rebalance("done-rebalance"), update_topology("update-topoloogy"); private String status; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java index fd6f461..51da198 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java @@ -17,33 +17,12 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.LinkedBlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.generated.StormTopology; import backtype.storm.scheduler.WorkerSlot; - import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.Cluster; -import com.alibaba.jstorm.cluster.StormBase; -import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.cluster.StormStatus; +import com.alibaba.jstorm.cluster.*; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskStartEvent; import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.AssignmentBak; @@ -51,15 +30,23 @@ import com.alibaba.jstorm.schedule.IToplogyScheduler; import com.alibaba.jstorm.schedule.TopologyAssignContext; import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyScheduler; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; +import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.utils.FailedAssignTopologyException; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.LinkedBlockingQueue; public class TopologyAssign implements Runnable { - private final static Logger LOG = LoggerFactory - .getLogger(TopologyAssign.class); + private final static Logger LOG = LoggerFactory.getLogger(TopologyAssign.class); /** * private constructor function to avoid multiple instance @@ -93,7 +80,7 @@ public class TopologyAssign implements Runnable { public void init(NimbusData nimbusData) { this.nimbusData = nimbusData; - //this.cleanupTimeoutSec = 60; + // this.cleanupTimeoutSec = 60; this.schedulers = new HashMap<String, IToplogyScheduler>(); @@ -113,8 +100,7 @@ public class TopologyAssign implements Runnable { thread.interrupt(); } - protected static LinkedBlockingQueue<TopologyAssignEvent> queue = - new LinkedBlockingQueue<TopologyAssignEvent>(); + protected static LinkedBlockingQueue<TopologyAssignEvent> queue = new LinkedBlockingQueue<TopologyAssignEvent>(); public static void push(TopologyAssignEvent event) { queue.offer(event); @@ -159,12 +145,37 @@ public class TopologyAssign implements Runnable { * @return */ protected boolean doTopologyAssignment(TopologyAssignEvent event) { - Assignment assignment = null; + Assignment assignment; try { + Assignment oldAssignment = null; + boolean isReassign = event.isScratch(); + if (isReassign) { + oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null); + } assignment = mkAssignment(event); - if (!(event.isScratch())) + // notify jstorm monitor on task assign/reassign/rebalance + TaskStartEvent taskEvent = new TaskStartEvent(); + taskEvent.oldAssignment = oldAssignment; + taskEvent.newAssignment = assignment; + taskEvent.topologyId = event.getTopologyId(); + taskEvent.clusterName = nimbusData.getClusterName(); + taskEvent.timestamp = System.currentTimeMillis(); + + Map<Integer, String> task2Component; + // get from nimbus cache first + Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()); + if (taskInfoMap != null) { + task2Component = Common.getTaskToComponent(taskInfoMap); + } else { + task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId())); + } + taskEvent.task2Component = task2Component; + nimbusData.getMetricRunnable().pushEvent(taskEvent); + + if (!isReassign) { setTopologyStatus(event); + } } catch (Throwable e) { LOG.error("Failed to assign topology " + event.getTopologyId(), e); event.fail(e.getMessage()); @@ -180,8 +191,6 @@ public class TopologyAssign implements Runnable { /** * cleanup the topologies which are not in ZK /topology, but in other place * - * @param nimbusData - * @param active_topologys * @throws Exception */ public void cleanupDisappearedTopology() throws Exception { @@ -192,8 +201,7 @@ public class TopologyAssign implements Runnable { return; } - Set<String> cleanupIds = - get_cleanup_ids(clusterState, active_topologys); + Set<String> cleanupIds = get_cleanup_ids(clusterState, active_topologys); for (String topologyId : cleanupIds) { @@ -202,13 +210,12 @@ public class TopologyAssign implements Runnable { clusterState.try_remove_storm(topologyId); // nimbusData.getTaskHeartbeatsCache().remove(topologyId); + nimbusData.getTasksHeartbeat().remove(topologyId); NimbusUtils.removeTopologyTaskTimeout(nimbusData, topologyId); // get /nimbus/stormdist/topologyId - String master_stormdist_root = - StormConfig.masterStormdistRoot(nimbusData.getConf(), - topologyId); + String master_stormdist_root = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId); try { // delete topologyId local dir PathUtils.rmr(master_stormdist_root); @@ -218,14 +225,12 @@ public class TopologyAssign implements Runnable { } } - private void get_code_ids(List<String> code_ids, - HashSet<String> latest_code_ids) throws IOException { + private void get_code_ids(List<String> code_ids, HashSet<String> latest_code_ids) throws IOException { Map conf = nimbusData.getConf(); String master_stormdist_root = StormConfig.masterStormdistRoot(conf); // listdir /local-dir/nimbus/stormdist - List<String> all_code_ids = - PathUtils.read_dir_contents(master_stormdist_root); + List<String> all_code_ids = PathUtils.read_dir_contents(master_stormdist_root); code_ids.addAll(all_code_ids); long now = System.currentTimeMillis(); @@ -238,9 +243,6 @@ public class TopologyAssign implements Runnable { long modify = file.lastModified(); - if (now - modify < cleanupTimeoutSec * 1000) { - latest_code_ids.add(dir); - } } catch (Exception exception) { LOG.error("Failed to get modify time of " + dir, exception); } @@ -256,14 +258,14 @@ public class TopologyAssign implements Runnable { * @return * @throws Exception */ - private Set<String> get_cleanup_ids(StormClusterState clusterState, - List<String> active_topologys) throws Exception { + private Set<String> get_cleanup_ids(StormClusterState clusterState, List<String> active_topologys) throws Exception { List<String> task_ids = clusterState.task_storms(); List<String> heartbeat_ids = clusterState.heartbeat_storms(); List<String> error_ids = clusterState.task_error_storms(); List<String> assignment_ids = clusterState.assignments(null); List<String> metric_ids = clusterState.get_metrics(); + List<String> backpressure_ids = clusterState.backpressureInfos(); List<String> code_ids = new ArrayList<String>(); HashSet<String> latest_code_ids = new HashSet<String>(); @@ -272,8 +274,7 @@ public class TopologyAssign implements Runnable { // Set<String> assigned_ids = // JStormUtils.listToSet(clusterState.active_storms()); Set<String> to_cleanup_ids = new HashSet<String>(); - Set<String> pendingTopologys = - nimbusData.getPendingSubmitTopoloygs().keySet(); + Set<String> pendingTopologys = nimbusData.getPendingSubmitTopoloygs().keySet(); if (task_ids != null) { to_cleanup_ids.addAll(task_ids); @@ -294,9 +295,13 @@ public class TopologyAssign implements Runnable { if (code_ids != null) { to_cleanup_ids.addAll(code_ids); } - + if (metric_ids != null) { - to_cleanup_ids.addAll(metric_ids); + to_cleanup_ids.addAll(metric_ids); + } + + if (backpressure_ids != null) { + to_cleanup_ids.addAll(backpressure_ids); } if (active_topologys != null) { @@ -309,8 +314,7 @@ public class TopologyAssign implements Runnable { } /** - * Why need to remove latest code. Due to competition between - * Thrift.threads and TopologyAssign thread + * Why need to remove latest code. Due to competition between Thrift.threads and TopologyAssign thread * */ to_cleanup_ids.removeAll(latest_code_ids); @@ -321,11 +325,6 @@ public class TopologyAssign implements Runnable { /** * start a topology: set active status of the topology - * - * @param topologyName - * @param stormClusterState - * @param topologyId - * @throws Exception */ public void setTopologyStatus(TopologyAssignEvent event) throws Exception { StormClusterState stormClusterState = nimbusData.getStormClusterState(); @@ -339,15 +338,11 @@ public class TopologyAssign implements Runnable { status = event.getOldStatus(); } - boolean isEnable = - ConfigExtension - .isEnablePerformanceMetrics(nimbusData.getConf()); + boolean isEnable = ConfigExtension.isEnablePerformanceMetrics(nimbusData.getConf()); StormBase stormBase = stormClusterState.storm_base(topologyId, null); if (stormBase == null) { - stormBase = - new StormBase(topologyName, TimeUtils.current_time_secs(), - status, group); + stormBase = new StormBase(topologyName, TimeUtils.current_time_secs(), status, group); stormBase.setEnableMonitor(isEnable); stormClusterState.activate_storm(topologyId, stormBase); @@ -367,18 +362,20 @@ public class TopologyAssign implements Runnable { } - protected TopologyAssignContext prepareTopologyAssign( - TopologyAssignEvent event) throws Exception { + protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception { TopologyAssignContext ret = new TopologyAssignContext(); String topologyId = event.getTopologyId(); + ret.setTopologyId(topologyId); + + int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId(); + ret.setTopologyMasterTaskId(topoMasterId); + LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId); Map<Object, Object> nimbusConf = nimbusData.getConf(); - Map<Object, Object> topologyConf = - StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId); + Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId); - StormTopology rawTopology = - StormConfig.read_nimbus_topology_code(nimbusConf, topologyId); + StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId); ret.setRawTopology(rawTopology); Map stormConf = new HashMap(); @@ -389,8 +386,7 @@ public class TopologyAssign implements Runnable { StormClusterState stormClusterState = nimbusData.getStormClusterState(); // get all running supervisor, don't need callback to watch supervisor - Map<String, SupervisorInfo> supInfos = - Cluster.get_all_SupervisorInfo(stormClusterState, null); + Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null); // init all AvailableWorkerPorts for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) { SupervisorInfo supervisor = supInfo.getValue(); @@ -400,21 +396,16 @@ public class TopologyAssign implements Runnable { getAliveSupervsByHb(supInfos, nimbusConf); if (supInfos.size() == 0) { - throw new FailedAssignTopologyException( - "Failed to make assignment " + topologyId - + ", due to no alive supervisor"); + throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor"); } - Map<Integer, String> taskToComponent = - Cluster.get_all_task_component(stormClusterState, topologyId, null); + Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null); ret.setTaskToComponent(taskToComponent); // get taskids /ZK/tasks/topologyId Set<Integer> allTaskIds = taskToComponent.keySet(); if (allTaskIds == null || allTaskIds.size() == 0) { - String errMsg = - "Failed to get all task ID list from /ZK-dir/tasks/" - + topologyId; + String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId; LOG.warn(errMsg); throw new IOException(errMsg); } @@ -425,18 +416,31 @@ public class TopologyAssign implements Runnable { // machine Set<Integer> unstoppedTasks = new HashSet<Integer>(); Set<Integer> deadTasks = new HashSet<Integer>(); - Set<ResourceWorkerSlot> unstoppedWorkers = - new HashSet<ResourceWorkerSlot>(); + Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>(); - Assignment existingAssignment = - stormClusterState.assignment_info(topologyId, null); + Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null); if (existingAssignment != null) { aliveTasks = getAliveTasks(topologyId, allTaskIds); - unstoppedTasks = - getUnstoppedSlots(aliveTasks, supInfos, existingAssignment); - deadTasks.addAll(allTaskIds); - deadTasks.removeAll(aliveTasks); + /* + * Check if the topology master task is alive first since all task + * heartbeat info is reported by topology master. + * If master is dead, do reassignment for topology master first. + */ + if (aliveTasks.contains(topoMasterId) == false) { + ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId); + deadTasks.addAll(worker.getTasks()); + + Set<Integer> tempSet = new HashSet<Integer>(allTaskIds); + tempSet.removeAll(deadTasks); + aliveTasks.addAll(tempSet); + aliveTasks.removeAll(deadTasks); + } else { + deadTasks.addAll(allTaskIds); + deadTasks.removeAll(aliveTasks); + } + + unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment); } ret.setDeadTaskIds(deadTasks); @@ -451,9 +455,7 @@ public class TopologyAssign implements Runnable { ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW); try { - AssignmentBak lastAssignment = - stormClusterState.assignment_bak(event - .getTopologyName()); + AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName()); if (lastAssignment != null) { ret.setOldAssignment(lastAssignment.getAssignment()); } @@ -465,13 +467,11 @@ public class TopologyAssign implements Runnable { if (event.isScratch()) { ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE); ret.setIsReassign(event.isReassign()); - unstoppedWorkers = - getUnstoppedWorkers(unstoppedTasks, existingAssignment); + unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment); ret.setUnstoppedWorkers(unstoppedWorkers); } else { ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR); - unstoppedWorkers = - getUnstoppedWorkers(aliveTasks, existingAssignment); + unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment); ret.setUnstoppedWorkers(unstoppedWorkers); } } @@ -480,13 +480,8 @@ public class TopologyAssign implements Runnable { } /** - * make assignments for a topology The nimbus core function, this function - * has been totally rewrite + * make assignments for a topology The nimbus core function, this function has been totally rewrite * - * @param nimbusData NimbusData - * @param topologyId String - * @param isScratch Boolean: isScratch is false unless rebalancing the - * topology * @throws Exception */ public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { @@ -500,8 +495,7 @@ public class TopologyAssign implements Runnable { if (!StormConfig.local_mode(nimbusData.getConf())) { - IToplogyScheduler scheduler = - schedulers.get(DEFAULT_SCHEDULER_NAME); + IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); assignments = scheduler.assignTasks(context); @@ -511,29 +505,24 @@ public class TopologyAssign implements Runnable { Assignment assignment = null; if (assignments != null && assignments.size() > 0) { - Map<String, String> nodeHost = - getTopologyNodeHost(context.getCluster(), - context.getOldAssignment(), assignments); + Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments); - Map<Integer, Integer> startTimes = - getTaskStartTimes(context, nimbusData, topologyId, - context.getOldAssignment(), assignments); + Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments); - String codeDir = - StormConfig.masterStormdistRoot(nimbusData.getConf(), - topologyId); + String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId); - assignment = - new Assignment(codeDir, assignments, nodeHost, startTimes); + assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); - StormClusterState stormClusterState = - nimbusData.getStormClusterState(); + // the topology binary changed. + if (event.isScaleTopology()){ + assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology); + } + StormClusterState stormClusterState = nimbusData.getStormClusterState(); stormClusterState.set_assignment(topologyId, assignment); // update task heartbeat's start time - NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, - topologyId); + NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); // @@@ TODO @@ -547,14 +536,13 @@ public class TopologyAssign implements Runnable { NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId); - LOG.info("Successfully make assignment for topology id " - + topologyId + ": " + assignment); + LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment); } return assignment; } private static Set<ResourceWorkerSlot> mkLocalAssignment( - TopologyAssignContext context) { + TopologyAssignContext context) throws Exception { Set<ResourceWorkerSlot> result = new HashSet<ResourceWorkerSlot>(); Map<String, SupervisorInfo> cluster = context.getCluster(); if (cluster.size() != 1) @@ -565,7 +553,15 @@ public class TopologyAssign implements Runnable { supervisorId = entry.getKey(); localSupervisor = entry.getValue(); } - int port = localSupervisor.getAvailableWorkerPorts().iterator().next(); + int port = -1; + if (localSupervisor.getAvailableWorkerPorts().iterator().hasNext()) { + port = localSupervisor.getAvailableWorkerPorts().iterator().next(); + } else { + LOG.info(" amount of worker's ports is not enough"); + throw new FailedAssignTopologyException( + "Failed to make assignment " + ", due to no enough ports"); + } + ResourceWorkerSlot worker = new ResourceWorkerSlot(supervisorId, port); worker.setTasks(new HashSet<Integer>(context.getAllTaskIds())); worker.setHostname(localSupervisor.getHostName()); @@ -573,16 +569,8 @@ public class TopologyAssign implements Runnable { return result; } - /** - * @param existingAssignment - * @param taskWorkerSlot - * @return - * @throws Exception - */ - public static Map<Integer, Integer> getTaskStartTimes( - TopologyAssignContext context, NimbusData nimbusData, - String topologyId, Assignment existingAssignment, - Set<ResourceWorkerSlot> workers) throws Exception { + public static Map<Integer, Integer> getTaskStartTimes(TopologyAssignContext context, NimbusData nimbusData, String topologyId, + Assignment existingAssignment, Set<ResourceWorkerSlot> workers) throws Exception { Map<Integer, Integer> startTimes = new TreeMap<Integer, Integer>(); @@ -600,8 +588,7 @@ public class TopologyAssign implements Runnable { Set<ResourceWorkerSlot> oldWorkers = new HashSet<ResourceWorkerSlot>(); if (existingAssignment != null) { - Map<Integer, Integer> taskStartTimeSecs = - existingAssignment.getTaskStartTimeSecs(); + Map<Integer, Integer> taskStartTimeSecs = existingAssignment.getTaskStartTimeSecs(); if (taskStartTimeSecs != null) { startTimes.putAll(taskStartTimeSecs); } @@ -616,23 +603,21 @@ public class TopologyAssign implements Runnable { int nowSecs = TimeUtils.current_time_secs(); for (Integer changedTaskId : changedTaskIds) { startTimes.put(changedTaskId, nowSecs); - zkClusterState.remove_task_heartbeat(topologyId, changedTaskId); + NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, changedTaskId); } Set<Integer> removedTaskIds = getRemovedTaskIds(oldWorkers, workers); for (Integer removedTaskId : removedTaskIds) { startTimes.remove(removedTaskId); - zkClusterState.remove_task_heartbeat(topologyId, removedTaskId); + NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, removedTaskId); } - LOG.info("Task assignment has been changed: " + changedTaskIds - + ", removed tasks " + removedTaskIds); + LOG.info("Task assignment has been changed: " + changedTaskIds + ", removed tasks " + removedTaskIds); return startTimes; } - public static Map<String, String> getTopologyNodeHost( - Map<String, SupervisorInfo> supervisorMap, - Assignment existingAssignment, Set<ResourceWorkerSlot> workers) { + public static Map<String, String> getTopologyNodeHost(Map<String, SupervisorInfo> supervisorMap, Assignment existingAssignment, + Set<ResourceWorkerSlot> workers) { // the following is that remove unused node from allNodeHost Set<String> usedNodes = new HashSet<String>(); @@ -649,8 +634,7 @@ public class TopologyAssign implements Runnable { } // get alive supervisorMap Map<supervisorId, hostname> - Map<String, String> nodeHost = - SupervisorInfo.getNodeHost(supervisorMap); + Map<String, String> nodeHost = SupervisorInfo.getNodeHost(supervisorMap); if (nodeHost != null) { allNodeHost.putAll(nodeHost); } @@ -661,8 +645,7 @@ public class TopologyAssign implements Runnable { if (allNodeHost.containsKey(supervisorId)) { ret.put(supervisorId, allNodeHost.get(supervisorId)); } else { - LOG.warn("Node " + supervisorId - + " doesn't in the supervisor list"); + LOG.warn("Node " + supervisorId + " doesn't in the supervisor list"); } } @@ -672,16 +655,12 @@ public class TopologyAssign implements Runnable { /** * get all taskids which are assigned newly or reassigned * - * @param taskToWorkerSlot - * @param newtaskToWorkerSlot * @return Set<Integer> taskid which is assigned newly or reassigned */ - public static Set<Integer> getNewOrChangedTaskIds( - Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) { + public static Set<Integer> getNewOrChangedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) { Set<Integer> rtn = new HashSet<Integer>(); - HashMap<String, ResourceWorkerSlot> workerPortMap = - HostPortToWorkerMap(oldWorkers); + HashMap<String, ResourceWorkerSlot> workerPortMap = HostPortToWorkerMap(oldWorkers); for (ResourceWorkerSlot worker : workers) { ResourceWorkerSlot oldWorker = workerPortMap.get(worker.getHostPort()); if (oldWorker != null) { @@ -691,14 +670,15 @@ public class TopologyAssign implements Runnable { rtn.add(task); } } else { - rtn.addAll(worker.getTasks()); + if (worker.getTasks() != null) { + rtn.addAll(worker.getTasks()); + } } } return rtn; } - public static Set<Integer> getRemovedTaskIds( - Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) { + public static Set<Integer> getRemovedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) { Set<Integer> rtn = new HashSet<Integer>(); Set<Integer> oldTasks = getTaskSetFromWorkerSet(oldWorkers); @@ -711,8 +691,7 @@ public class TopologyAssign implements Runnable { return rtn; } - private static Set<Integer> getTaskSetFromWorkerSet( - Set<ResourceWorkerSlot> workers) { + private static Set<Integer> getTaskSetFromWorkerSet(Set<ResourceWorkerSlot> workers) { Set<Integer> rtn = new HashSet<Integer>(); for (ResourceWorkerSlot worker : workers) { rtn.addAll(worker.getTasks()); @@ -720,10 +699,8 @@ public class TopologyAssign implements Runnable { return rtn; } - private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap( - Set<ResourceWorkerSlot> workers) { - HashMap<String, ResourceWorkerSlot> rtn = - new HashMap<String, ResourceWorkerSlot>(); + private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(Set<ResourceWorkerSlot> workers) { + HashMap<String, ResourceWorkerSlot> rtn = new HashMap<String, ResourceWorkerSlot>(); for (ResourceWorkerSlot worker : workers) { rtn.put(worker.getHostPort(), worker); } @@ -731,17 +708,14 @@ public class TopologyAssign implements Runnable { } /** - * sort slots, the purpose is to ensure that the tasks are assigned in - * balancing + * sort slots, the purpose is to ensure that the tasks are assigned in balancing * * @param allSlots * @return List<WorkerSlot> */ - public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots, - int needSlotNum) { + public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots, int needSlotNum) { - Map<String, List<WorkerSlot>> nodeMap = - new HashMap<String, List<WorkerSlot>>(); + Map<String, List<WorkerSlot>> nodeMap = new HashMap<String, List<WorkerSlot>>(); // group by first for (WorkerSlot np : allSlots) { @@ -778,8 +752,7 @@ public class TopologyAssign implements Runnable { } // interleave - List<List<WorkerSlot>> splitup = - new ArrayList<List<WorkerSlot>>(nodeMap.values()); + List<List<WorkerSlot>> splitup = new ArrayList<List<WorkerSlot>>(nodeMap.values()); Collections.sort(splitup, new Comparator<List<WorkerSlot>>() { @@ -801,13 +774,8 @@ public class TopologyAssign implements Runnable { /** * Get unstopped slots from alive task list - * - * @param aliveAssigned - * @param supInfos - * @return */ - public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks, - Map<String, SupervisorInfo> supInfos, Assignment existAssignment) { + public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks, Map<String, SupervisorInfo> supInfos, Assignment existAssignment) { Set<Integer> ret = new HashSet<Integer>(); Set<ResourceWorkerSlot> oldWorkers = existAssignment.getWorkers(); @@ -835,8 +803,7 @@ public class TopologyAssign implements Runnable { } - private Set<ResourceWorkerSlot> getUnstoppedWorkers( - Set<Integer> aliveTasks, Assignment existAssignment) { + private Set<ResourceWorkerSlot> getUnstoppedWorkers(Set<Integer> aliveTasks, Assignment existAssignment) { Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>(); for (ResourceWorkerSlot worker : existAssignment.getWorkers()) { boolean alive = true; @@ -860,12 +827,9 @@ public class TopologyAssign implements Runnable { * @param stormClusterState * @throws Exception */ - public static void getFreeSlots( - Map<String, SupervisorInfo> supervisorInfos, - StormClusterState stormClusterState) throws Exception { + public static void getFreeSlots(Map<String, SupervisorInfo> supervisorInfos, StormClusterState stormClusterState) throws Exception { - Map<String, Assignment> assignments = - Cluster.get_all_assignment(stormClusterState, null); + Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null); for (Entry<String, Assignment> entry : assignments.entrySet()) { String topologyId = entry.getKey(); @@ -875,8 +839,7 @@ public class TopologyAssign implements Runnable { for (ResourceWorkerSlot worker : workers) { - SupervisorInfo supervisorInfo = - supervisorInfos.get(worker.getNodeId()); + SupervisorInfo supervisorInfo = supervisorInfos.get(worker.getNodeId()); if (supervisorInfo == null) { // the supervisor is dead continue; @@ -888,31 +851,20 @@ public class TopologyAssign implements Runnable { } /** - * find all alived taskid Does not assume that clocks are synchronized. Task - * heartbeat is only used so that nimbus knows when it's received a new - * heartbeat. All timing is done by nimbus and tracked through - * task-heartbeat-cache + * find all alived taskid Does not assume that clocks are synchronized. Task heartbeat is only used so that nimbus knows when it's received a new heartbeat. + * All timing is done by nimbus and tracked through task-heartbeat-cache * - * @param conf - * @param topologyId - * @param stormClusterState - * @param taskIds - * @param taskStartTimes - * @param taskHeartbeatsCache --Map<topologyId, Map<taskid, - * Map<tkHbCacheTime, time>>> * @return Set<Integer> : taskid * @throws Exception */ - public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds) - throws Exception { + public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds) throws Exception { Set<Integer> aliveTasks = new HashSet<Integer>(); // taskIds is the list from ZK /ZK-DIR/tasks/topologyId for (int taskId : taskIds) { - boolean isDead = - NimbusUtils.isTaskDead(nimbusData, topologyId, taskId); + boolean isDead = NimbusUtils.isTaskDead(nimbusData, topologyId, taskId); if (isDead == false) { aliveTasks.add(taskId); } @@ -925,24 +877,20 @@ public class TopologyAssign implements Runnable { /** * Backup the toplogy's Assignment to ZK * - * @@@ Question Do we need to do backup operation every time? * @param assignment * @param event + * @@@ Question Do we need to do backup operation every time? */ - public void backupAssignment(Assignment assignment, - TopologyAssignEvent event) { + public void backupAssignment(Assignment assignment, TopologyAssignEvent event) { String topologyId = event.getTopologyId(); String topologyName = event.getTopologyName(); try { - StormClusterState zkClusterState = - nimbusData.getStormClusterState(); + StormClusterState zkClusterState = nimbusData.getStormClusterState(); // one little problem, get tasks twice when assign one topology - Map<Integer, String> tasks = - Cluster.get_all_task_component(zkClusterState, topologyId, null); + Map<Integer, String> tasks = Cluster.get_all_task_component(zkClusterState, topologyId, null); - Map<String, List<Integer>> componentTasks = - JStormUtils.reverse_map(tasks); + Map<String, List<Integer>> componentTasks = JStormUtils.reverse_map(tasks); for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) { List<Integer> keys = entry.getValue(); @@ -951,31 +899,24 @@ public class TopologyAssign implements Runnable { } - AssignmentBak assignmentBak = - new AssignmentBak(componentTasks, assignment); + AssignmentBak assignmentBak = new AssignmentBak(componentTasks, assignment); zkClusterState.backup_assignment(topologyName, assignmentBak); } catch (Exception e) { - LOG.warn("Failed to backup " + topologyId + " assignment " - + assignment, e); + LOG.warn("Failed to backup " + topologyId + " assignment " + assignment, e); } } - private void getAliveSupervsByHb( - Map<String, SupervisorInfo> supervisorInfos, Map conf) { + private void getAliveSupervsByHb(Map<String, SupervisorInfo> supervisorInfos, Map conf) { int currentTime = TimeUtils.current_time_secs(); - int hbTimeout = - JStormUtils.parseInt( - conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS), - (JStormUtils.MIN_1 * 3)); + int hbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS), (JStormUtils.MIN_1 * 3)); Set<String> supervisorTobeRemoved = new HashSet<String>(); for (Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) { SupervisorInfo supInfo = entry.getValue(); int lastReportTime = supInfo.getTimeSecs(); if ((currentTime - lastReportTime) > hbTimeout) { - LOG.warn("Supervisor-" + supInfo.getHostName() - + " is dead. lastReportTime=" + lastReportTime); + LOG.warn("Supervisor-" + supInfo.getHostName() + " is dead. lastReportTime=" + lastReportTime); supervisorTobeRemoved.add(entry.getKey()); } } @@ -989,8 +930,6 @@ public class TopologyAssign implements Runnable { * @param args */ public static void main(String[] args) { - // TODO Auto-generated method stub - } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java index 8725918..a0bf9b9 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java @@ -25,7 +25,7 @@ import com.alibaba.jstorm.cluster.StormStatus; public class TopologyAssignEvent { // unit is minutes - private static final int DEFAULT_WAIT_TIME = 2; + private static final int DEFAULT_WAIT_TIME = 5; private String topologyId; private String topologyName; // if this field has been set, it is create private String group; @@ -37,6 +37,14 @@ public class TopologyAssignEvent { private CountDownLatch latch = new CountDownLatch(1); private boolean isSuccess = false; private String errorMsg; + private boolean isScaleTopology = false; + + public void setScaleTopology(boolean isScaleTopology){ + this.isScaleTopology = isScaleTopology; + } + public boolean isScaleTopology(){ + return isScaleTopology; + } public String getTopologyId() { return topologyId;
