http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java index f739087..0bb1bb7 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java @@ -32,7 +32,6 @@ import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.error.TaskError; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; import com.alibaba.jstorm.utils.TimeUtils; /** @@ -61,6 +60,8 @@ public class Cluster { public static final String METRIC_ROOT = "metrics"; public static final String LAST_ERROR = "last_error"; + public static final String NIMBUS_SLAVE_DETAIL_ROOT= "nimbus_slave_detail"; + public static final String BACKPRESSURE_ROOT = "backpressure"; public static final String ASSIGNMENTS_SUBTREE; public static final String ASSIGNMENTS_BAK_SUBTREE; @@ -72,6 +73,8 @@ public class Cluster { public static final String MASTER_SUBTREE; public static final String NIMBUS_SLAVE_SUBTREE; public static final String METRIC_SUBTREE; + public static final String NIMBUS_SLAVE_DETAIL_SUBTREE; + public static final String BACKPRESSURE_SUBTREE; static { ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT; @@ -84,6 +87,8 @@ public class Cluster { MASTER_SUBTREE = ZK_SEPERATOR + MASTER_ROOT; NIMBUS_SLAVE_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_ROOT; METRIC_SUBTREE = ZK_SEPERATOR + METRIC_ROOT; + NIMBUS_SLAVE_DETAIL_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_DETAIL_ROOT; + BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT; } public static String supervisor_path(String id) { @@ -106,10 +111,6 @@ public class Cluster { return TASKBEATS_SUBTREE + ZK_SEPERATOR + topology_id; } - public static String taskbeat_path(String topology_id, int task_id) { - return taskbeat_storm_root(topology_id) + ZK_SEPERATOR + task_id; - } - public static String taskerror_storm_root(String topology_id) { return TASKERRORS_SUBTREE + ZK_SEPERATOR + topology_id; } @@ -130,97 +131,71 @@ public class Cluster { return ASSIGNMENTS_BAK_SUBTREE + ZK_SEPERATOR + id; } + public static String backpressure_path(String topology_id) { + return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + topology_id; + } + @SuppressWarnings("rawtypes") - public static StormClusterState mk_storm_cluster_state( - Map cluster_state_spec) throws Exception { + public static StormClusterState mk_storm_cluster_state(Map cluster_state_spec) throws Exception { return new StormZkClusterState(cluster_state_spec); } - public static StormClusterState mk_storm_cluster_state( - ClusterState cluster_state_spec) throws Exception { + public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec) throws Exception { return new StormZkClusterState(cluster_state_spec); } - public static Map<Integer, TaskInfo> get_all_taskInfo( - StormClusterState zkCluster, String topologyId) throws Exception { - return zkCluster.task_all_info(topologyId); + public static Map<Integer, TaskInfo> get_all_taskInfo(StormClusterState zkCluster, String topologyId) throws Exception { + return zkCluster.task_all_info(topologyId); } - - - public static Map<Integer, String> get_all_task_component( - StormClusterState zkCluster, String topologyId, - Map<Integer, TaskInfo> taskInfoMap) throws Exception { + + public static Map<Integer, String> get_all_task_component(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap) + throws Exception { if (taskInfoMap == null) { taskInfoMap = get_all_taskInfo(zkCluster, topologyId); } - + if (taskInfoMap == null) { return null; } - + return Common.getTaskToComponent(taskInfoMap); } - - public static Map<Integer, String> get_all_task_type( - StormClusterState zkCluster, String topologyId, - Map<Integer, TaskInfo> taskInfoMap) throws Exception { + + public static Map<Integer, String> get_all_task_type(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception { if (taskInfoMap == null) { taskInfoMap = get_all_taskInfo(zkCluster, topologyId); } - + if (taskInfoMap == null) { return null; } - - return Common.getTaskToType(taskInfoMap); - } - - public static Map<String, TaskHeartbeat> get_all_task_heartbeat( - StormClusterState zkCluster, String topologyId) throws Exception { - Map<String, TaskHeartbeat> ret = new HashMap<String, TaskHeartbeat>(); - - List<String> taskList = zkCluster.heartbeat_tasks(topologyId); - for (String taskId : taskList) { - TaskHeartbeat hb = - zkCluster.task_heartbeat(topologyId, - Integer.valueOf(taskId)); - if (hb == null) { - LOG.error("Failed to get hearbeat of " + topologyId + ":" - + taskId); - continue; - } - ret.put(taskId, hb); - } - - return ret; + return Common.getTaskToType(taskInfoMap); } /** - * if one topology's name equal the input storm_name, then return the - * topology id, otherwise return null + * if one topology's name equal the input storm_name, then return the topology id, otherwise return null * * @param zkCluster * @param storm_name * @return * @throws Exception */ - public static String get_topology_id(StormClusterState zkCluster, - String storm_name) throws Exception { + public static String get_topology_id(StormClusterState zkCluster, String storm_name) throws Exception { List<String> active_storms = zkCluster.active_storms(); String rtn = null; if (active_storms != null) { for (String topology_id : active_storms) { - + if (topology_id.indexOf(storm_name) < 0) { continue; } - - String zkTopologyName = Common.topologyIdToName(topology_id); - if (storm_name.endsWith(zkTopologyName)) { - return topology_id; + StormBase base = zkCluster.storm_base(topology_id, null); + if (base != null && storm_name.equals(Common.getTopologyNameById(topology_id))) { + rtn = topology_id; + break; } - + } } return rtn; @@ -233,8 +208,7 @@ public class Cluster { * @return <topology_id, StormBase> * @throws Exception */ - public static HashMap<String, StormBase> get_all_StormBase( - StormClusterState zkCluster) throws Exception { + public static HashMap<String, StormBase> get_all_StormBase(StormClusterState zkCluster) throws Exception { HashMap<String, StormBase> rtn = new HashMap<String, StormBase>(); List<String> active_storms = zkCluster.active_storms(); if (active_storms != null) { @@ -253,25 +227,20 @@ public class Cluster { * * @param stormClusterState * @param callback - * @return Map<String, SupervisorInfo> String: supervisorId SupervisorInfo: - * [time-secs hostname worker-ports uptime-secs] + * @return Map<String, SupervisorInfo> String: supervisorId SupervisorInfo: [time-secs hostname worker-ports uptime-secs] * @throws Exception */ - public static Map<String, SupervisorInfo> get_all_SupervisorInfo( - StormClusterState stormClusterState, RunnableCallback callback) - throws Exception { + public static Map<String, SupervisorInfo> get_all_SupervisorInfo(StormClusterState stormClusterState, RunnableCallback callback) throws Exception { Map<String, SupervisorInfo> rtn = new TreeMap<String, SupervisorInfo>(); // get /ZK/supervisors List<String> supervisorIds = stormClusterState.supervisors(callback); if (supervisorIds != null) { - for (Iterator<String> iter = supervisorIds.iterator(); iter - .hasNext();) { + for (Iterator<String> iter = supervisorIds.iterator(); iter.hasNext();) { String supervisorId = iter.next(); // get /supervisors/supervisorid - SupervisorInfo supervisorInfo = - stormClusterState.supervisor_info(supervisorId); + SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId); if (supervisorInfo == null) { LOG.warn("Failed to get SupervisorInfo of " + supervisorId); } else { @@ -286,9 +255,7 @@ public class Cluster { return rtn; } - public static Map<String, Assignment> get_all_assignment( - StormClusterState stormClusterState, RunnableCallback callback) - throws Exception { + public static Map<String, Assignment> get_all_assignment(StormClusterState stormClusterState, RunnableCallback callback) throws Exception { Map<String, Assignment> ret = new HashMap<String, Assignment>(); // get /assignments {topology_id} @@ -300,12 +267,10 @@ public class Cluster { for (String topology_id : assignments) { - Assignment assignment = - stormClusterState.assignment_info(topology_id, callback); + Assignment assignment = stormClusterState.assignment_info(topology_id, callback); if (assignment == null) { - LOG.error("Failed to get Assignment of " + topology_id - + " from ZK"); + LOG.error("Failed to get Assignment of " + topology_id + " from ZK"); continue; } @@ -315,8 +280,7 @@ public class Cluster { return ret; } - public static Map<String, String> get_all_nimbus_slave( - StormClusterState stormClusterState) throws Exception { + public static Map<String, String> get_all_nimbus_slave(StormClusterState stormClusterState) throws Exception { List<String> hosts = stormClusterState.get_nimbus_slaves(); if (hosts == null || hosts.size() == 0) { return null; @@ -331,11 +295,8 @@ public class Cluster { return ret; } - public static String get_supervisor_hostname( - StormClusterState stormClusterState, String supervisorId) - throws Exception { - SupervisorInfo supervisorInfo = - stormClusterState.supervisor_info(supervisorId); + public static String get_supervisor_hostname(StormClusterState stormClusterState, String supervisorId) throws Exception { + SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId); if (supervisorInfo == null) { return null; } else { @@ -343,12 +304,9 @@ public class Cluster { } } - public static boolean is_topology_exist_error( - StormClusterState stormClusterState, String topologyId) - throws Exception { + public static boolean is_topology_exist_error(StormClusterState stormClusterState, String topologyId) throws Exception { - Map<Integer, String> lastErrMap = - stormClusterState.topo_lastErr_time(topologyId); + Map<Integer, String> lastErrMap = stormClusterState.topo_lastErr_time(topologyId); if (lastErrMap == null || lastErrMap.size() == 0) { return false; } @@ -365,34 +323,33 @@ public class Cluster { return false; } - - public static Map<Integer, List<TaskError>> get_all_task_errors( - StormClusterState stormClusterState, String topologyId) { - Map<Integer, List<TaskError>> ret = new HashMap<Integer, List<TaskError>>(); - try { - List<String> errorTasks = stormClusterState.task_error_ids(topologyId); - if (errorTasks == null || errorTasks.size() == 0) { - return ret; - } - - for (String taskIdStr : errorTasks) { - Integer taskId = -1; - try { - taskId = Integer.valueOf(taskIdStr); - }catch(Exception e) { - // skip last_error - continue; - } - - List<TaskError> taskErrorList = stormClusterState.task_errors(topologyId, taskId); - ret.put(taskId, taskErrorList); - } - return ret; - } catch (Exception e) { - // TODO Auto-generated catch block - return ret; - } - - } + + public static Map<Integer, List<TaskError>> get_all_task_errors(StormClusterState stormClusterState, String topologyId) { + Map<Integer, List<TaskError>> ret = new HashMap<Integer, List<TaskError>>(); + try { + List<String> errorTasks = stormClusterState.task_error_ids(topologyId); + if (errorTasks == null || errorTasks.size() == 0) { + return ret; + } + + for (String taskIdStr : errorTasks) { + Integer taskId = -1; + try { + taskId = Integer.valueOf(taskIdStr); + } catch (Exception e) { + // skip last_error + continue; + } + + List<TaskError> taskErrorList = stormClusterState.task_errors(topologyId, taskId); + ret.put(taskId, taskErrorList); + } + return ret; + } catch (Exception e) { + // TODO Auto-generated catch block + return ret; + } + + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java index 8cba073..ad88717 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java @@ -39,8 +39,7 @@ public interface ClusterState { public byte[] get_data_sync(String path, boolean watch) throws Exception; - public List<String> get_children(String path, boolean watch) - throws Exception; + public List<String> get_children(String path, boolean watch) throws Exception; public void mkdirs(String path) throws Exception; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java index a9e3e0b..48528d7 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java @@ -17,35 +17,9 @@ */ package com.alibaba.jstorm.cluster; -import java.io.IOException; -import java.net.URLClassLoader; -import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.log4j.Logger; - import backtype.storm.Config; import backtype.storm.Constants; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.JavaObject; -import backtype.storm.generated.ShellComponent; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.StreamInfo; +import backtype.storm.generated.*; import backtype.storm.metric.SystemBolt; import backtype.storm.spout.ShellSpout; import backtype.storm.task.IBolt; @@ -54,17 +28,23 @@ import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.utils.ThriftTopologyUtils; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.daemon.worker.WorkerData; import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyAssignContext; -import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.acker.Acker; import com.alibaba.jstorm.task.group.MkGrouper; +import com.alibaba.jstorm.task.master.TopologyMaster; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.Thrift; import com.alibaba.jstorm.utils.TimeUtils; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URLClassLoader; +import java.security.InvalidParameterException; +import java.util.*; +import java.util.Map.Entry; /** * Base utility function @@ -75,14 +55,17 @@ import com.google.common.collect.Maps; * */ public class Common { - private final static Logger LOG = Logger.getLogger(Common.class); + private final static Logger LOG = LoggerFactory.getLogger(Common.class); + + public static final String TOPOLOGY_MASTER_COMPONENT_ID = "__topology_master"; + public static final String TOPOLOGY_MASTER_HB_STREAM_ID = "__master_task_heartbeat"; + public static final String TOPOLOGY_MASTER_METRICS_STREAM_ID = "__master_metrics"; + public static final String TOPOLOGY_MASTER_CONTROL_STREAM_ID = "__master_control_stream"; public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID; - public static final String ACKER_INIT_STREAM_ID = - Acker.ACKER_INIT_STREAM_ID; + public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID; public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID; - public static final String ACKER_FAIL_STREAM_ID = - Acker.ACKER_FAIL_STREAM_ID; + public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID; public static final String SYSTEM_STREAM_ID = "__system"; @@ -92,24 +75,20 @@ public class Common { public static final String LS_APPROVED_WORKERS = "approved-workers"; public static final String LS_TASK_CLEANUP_TIMEOUT = "task-cleanup-timeout"; - public static final String compErrorInfo = - "ID can only contains a-z, A-Z, 0-9, '-', '_', '.', '$', and should not start with \"__\"."; - public static final String nameErrorInfo = - "Name can only contains a-z, A-Z, 0-9, '-', '_', '.'"; + public static final String compErrorInfo = "ID can only contains a-z, A-Z, 0-9, '-', '_', '.', '$', and should not start with \"__\"."; + public static final String nameErrorInfo = "Name can only contains a-z, A-Z, 0-9, '-', '_', '.'"; public static boolean system_id(String id) { return Utils.isSystemId(id); } - private static void validate_component(Object obj) - throws InvalidTopologyException { + private static void validate_component(Object obj) throws InvalidTopologyException { if (obj instanceof StateSpoutSpec) { StateSpoutSpec spec = (StateSpoutSpec) obj; for (String id : spec.get_common().get_streams().keySet()) { if (system_id(id) || !charComponentValidate(id)) { - throw new InvalidTopologyException(id - + " is not a valid component id. " + compErrorInfo); + throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo); } } @@ -117,16 +96,14 @@ public class Common { SpoutSpec spec = (SpoutSpec) obj; for (String id : spec.get_common().get_streams().keySet()) { if (system_id(id) || !charComponentValidate(id)) { - throw new InvalidTopologyException(id - + " is not a valid component id. " + compErrorInfo); + throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo); } } } else if (obj instanceof Bolt) { Bolt spec = (Bolt) obj; for (String id : spec.get_common().get_streams().keySet()) { if (system_id(id) || !charComponentValidate(id)) { - throw new InvalidTopologyException(id - + " is not a valid component id. " + compErrorInfo); + throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo); } } } else { @@ -136,8 +113,7 @@ public class Common { } public static String topologyNameToId(String topologyName, int counter) { - return topologyName + "-" + counter + "-" - + TimeUtils.current_time_secs(); + return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs(); } public static String getTopologyNameById(String topologyId) { @@ -151,14 +127,12 @@ public class Common { } /** - * Convert topologyId to topologyName. TopologyId = - * topoloygName-counter-timeStamp + * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp * * @param topologyId * @return */ - public static String topologyIdToName(String topologyId) - throws InvalidTopologyException { + public static String topologyIdToName(String topologyId) throws InvalidTopologyException { String ret = null; int index = topologyId.lastIndexOf('-'); if (index != -1 && index > 2) { @@ -166,17 +140,14 @@ public class Common { if (index != -1 && index > 0) ret = topologyId.substring(0, index); else - throw new InvalidTopologyException(topologyId - + " is not a valid topologyId"); + throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); } else - throw new InvalidTopologyException(topologyId - + " is not a valid topologyId"); + throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); return ret; } /** - * Validation of topology name chars. Only alpha char, number, '-', '_', '.' - * are valid. + * Validation of topology name chars. Only alpha char, number, '-', '_', '.' are valid. * * @return */ @@ -185,8 +156,7 @@ public class Common { } /** - * Validation of topology component chars. Only alpha char, number, '-', - * '_', '.', '$' are valid. + * Validation of topology component chars. Only alpha char, number, '-', '_', '.', '$' are valid. * * @return */ @@ -201,12 +171,10 @@ public class Common { * @throws InvalidTopologyException */ @SuppressWarnings("unchecked") - public static void validate_ids(StormTopology topology, String topologyId) - throws InvalidTopologyException { + public static void validate_ids(StormTopology topology, String topologyId) throws InvalidTopologyException { String topologyName = topologyIdToName(topologyId); if (!charValidate(topologyName)) { - throw new InvalidTopologyException(topologyName - + " is not a valid topology name. " + nameErrorInfo); + throw new InvalidTopologyException(topologyName + " is not a valid topology name. " + nameErrorInfo); } List<String> list = new ArrayList<String>(); @@ -220,9 +188,7 @@ public class Common { for (String id : commids) { if (system_id(id) || !charComponentValidate(id)) { - throw new InvalidTopologyException(id - + " is not a valid component id. " - + compErrorInfo); + throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo); } } @@ -236,19 +202,16 @@ public class Common { List<String> offending = JStormUtils.getRepeat(list); if (offending.isEmpty() == false) { - throw new InvalidTopologyException("Duplicate component ids: " - + offending); + throw new InvalidTopologyException("Duplicate component ids: " + offending); } } - private static void validate_component_inputs(Object obj) - throws InvalidTopologyException { + private static void validate_component_inputs(Object obj) throws InvalidTopologyException { if (obj instanceof StateSpoutSpec) { StateSpoutSpec spec = (StateSpoutSpec) obj; if (!spec.get_common().get_inputs().isEmpty()) { - throw new InvalidTopologyException( - "May not declare inputs for a spout"); + throw new InvalidTopologyException("May not declare inputs for a spout"); } } @@ -256,22 +219,18 @@ public class Common { if (obj instanceof SpoutSpec) { SpoutSpec spec = (SpoutSpec) obj; if (!spec.get_common().get_inputs().isEmpty()) { - throw new InvalidTopologyException( - "May not declare inputs for a spout"); + throw new InvalidTopologyException("May not declare inputs for a spout"); } } } /** - * Validate the topology 1. component id name is valid or not 2. check some - * spout's input is empty or not + * Validate the topology 1. component id name is valid or not 2. check some spout's input is empty or not * * @param topology * @throws InvalidTopologyException */ - public static void validate_basic(StormTopology topology, - Map<Object, Object> totalStormConf, String topologyid) - throws InvalidTopologyException { + public static void validate_basic(StormTopology topology, Map<Object, Object> totalStormConf, String topologyid) throws InvalidTopologyException { validate_ids(topology, topologyid); for (StormTopology._Fields field : Thrift.SPOUT_FIELDS) { @@ -285,23 +244,15 @@ public class Common { } - Integer workerNum = - JStormUtils.parseInt(totalStormConf - .get(Config.TOPOLOGY_WORKERS)); + Integer workerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_WORKERS)); if (workerNum == null || workerNum <= 0) { - String errMsg = - "There are no Config.TOPOLOGY_WORKERS in configuration of " - + topologyid; + String errMsg = "There are no Config.TOPOLOGY_WORKERS in configuration of " + topologyid; throw new InvalidParameterException(errMsg); } - Integer ackerNum = - JStormUtils.parseInt(totalStormConf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); + Integer ackerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); if (ackerNum != null && ackerNum < 0) { - String errMsg = - "Invalide Config.TOPOLOGY_ACKERS in configuration of " - + topologyid; + String errMsg = "Invalide Config.TOPOLOGY_ACKERS in configuration of " + topologyid; throw new InvalidParameterException(errMsg); } @@ -310,23 +261,139 @@ public class Common { /** * Generate acker's input Map<GlobalStreamId, Grouping> * - * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt - * <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> - * <GlobalStreamId(boltId, ACKER_FAIL_STREAM_ID), ...> + * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId, + * ACKER_FAIL_STREAM_ID), ...> + * + * @param topology + * @return + */ + public static Map<GlobalStreamId, Grouping> topoMasterInputs(StormTopology topology) { + GlobalStreamId stream = null; + Grouping group = null; + + Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>(); + Map<String, SpoutSpec> spout_ids = topology.get_spouts(); + for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) { + String id = spout.getKey(); + + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID); + group = Thrift.mkAllGrouping(); + spout_inputs.put(stream, group); + + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID); + group = Thrift.mkAllGrouping(); + spout_inputs.put(stream, group); + + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID); + group = Thrift.mkAllGrouping(); + spout_inputs.put(stream, group); + } + + Map<String, Bolt> bolt_ids = topology.get_bolts(); + Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>(); + for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) { + String id = bolt.getKey(); + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID); + group = Thrift.mkAllGrouping(); + bolt_inputs.put(stream, group); + + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID); + group = Thrift.mkAllGrouping(); + bolt_inputs.put(stream, group); + + stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID); + group = Thrift.mkAllGrouping(); + bolt_inputs.put(stream, group); + } + + Map<GlobalStreamId, Grouping> himself_inputs = new HashMap<GlobalStreamId, Grouping>(); + stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_HB_STREAM_ID); + group = Thrift.mkAllGrouping(); + himself_inputs.put(stream, group); + + stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_METRICS_STREAM_ID); + group = Thrift.mkAllGrouping(); + himself_inputs.put(stream, group); + + Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>(); + allInputs.putAll(bolt_inputs); + allInputs.putAll(spout_inputs); + allInputs.putAll(himself_inputs); + return allInputs; + } + + /** + * Add topology master bolt to topology + */ + public static void addTopologyMaster(Map stormConf, StormTopology ret) { + // generate outputs + HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>(); + + List<String> list = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT); + outputs.put(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.outputFields(list)); + list = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS); + outputs.put(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.outputFields(list)); + list = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT); + outputs.put(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.outputFields(list)); + + IBolt topologyMaster = new TopologyMaster(); + + // generate inputs + Map<GlobalStreamId, Grouping> inputs = topoMasterInputs(ret); + + // generate topology master which will be stored in topology + Bolt topologyMasterBolt = Thrift.mkBolt(inputs, topologyMaster, outputs, 1); + + // add output stream to spout/bolt + for (Entry<String, Bolt> e : ret.get_bolts().entrySet()) { + Bolt bolt = e.getValue(); + ComponentCommon common = bolt.get_common(); + List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS); + common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields)); + fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT); + common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields)); + fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT); + common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields)); + + GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID); + common.put_to_inputs(stream, Thrift.mkDirectGrouping()); + bolt.set_common(common); + } + + for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) { + SpoutSpec spout = kv.getValue(); + ComponentCommon common = spout.get_common(); + List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS); + common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields)); + fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT); + common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields)); + fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT); + common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields)); + + GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID); + common.put_to_inputs(stream, Thrift.mkDirectGrouping()); + spout.set_common(common); + } + + ret.put_to_bolts(TOPOLOGY_MASTER_COMPONENT_ID, topologyMasterBolt); + } + + /** + * Generate acker's input Map<GlobalStreamId, Grouping> + * + * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId, + * ACKER_FAIL_STREAM_ID), ...> * * @param topology * @return */ - public static Map<GlobalStreamId, Grouping> acker_inputs( - StormTopology topology) { - Map<GlobalStreamId, Grouping> spout_inputs = - new HashMap<GlobalStreamId, Grouping>(); + public static Map<GlobalStreamId, Grouping> acker_inputs(StormTopology topology) { + Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>(); Map<String, SpoutSpec> spout_ids = topology.get_spouts(); for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) { String id = spout.getKey(); - GlobalStreamId stream = - new GlobalStreamId(id, ACKER_INIT_STREAM_ID); + GlobalStreamId stream = new GlobalStreamId(id, ACKER_INIT_STREAM_ID); Grouping group = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id")); @@ -334,27 +401,21 @@ public class Common { } Map<String, Bolt> bolt_ids = topology.get_bolts(); - Map<GlobalStreamId, Grouping> bolt_inputs = - new HashMap<GlobalStreamId, Grouping>(); + Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>(); for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) { String id = bolt.getKey(); - GlobalStreamId streamAck = - new GlobalStreamId(id, ACKER_ACK_STREAM_ID); - Grouping groupAck = - Thrift.mkFieldsGrouping(JStormUtils.mk_list("id")); + GlobalStreamId streamAck = new GlobalStreamId(id, ACKER_ACK_STREAM_ID); + Grouping groupAck = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id")); - GlobalStreamId streamFail = - new GlobalStreamId(id, ACKER_FAIL_STREAM_ID); - Grouping groupFail = - Thrift.mkFieldsGrouping(JStormUtils.mk_list("id")); + GlobalStreamId streamFail = new GlobalStreamId(id, ACKER_FAIL_STREAM_ID); + Grouping groupFail = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id")); bolt_inputs.put(streamAck, groupAck); bolt_inputs.put(streamFail, groupFail); } - Map<GlobalStreamId, Grouping> allInputs = - new HashMap<GlobalStreamId, Grouping>(); + Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>(); allInputs.putAll(bolt_inputs); allInputs.putAll(spout_inputs); return allInputs; @@ -397,12 +458,10 @@ public class Common { List<String> ackList = JStormUtils.mk_list("id", "ack-val"); - common.put_to_streams(ACKER_ACK_STREAM_ID, - Thrift.outputFields(ackList)); + common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(ackList)); List<String> failList = JStormUtils.mk_list("id"); - common.put_to_streams(ACKER_FAIL_STREAM_ID, - Thrift.outputFields(failList)); + common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(failList)); bolt.set_common(common); } @@ -414,17 +473,13 @@ public class Common { for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) { SpoutSpec bolt = kv.getValue(); ComponentCommon common = bolt.get_common(); - List<String> initList = - JStormUtils.mk_list("id", "init-val", "spout-task"); - common.put_to_streams(ACKER_INIT_STREAM_ID, - Thrift.outputFields(initList)); + List<String> initList = JStormUtils.mk_list("id", "init-val", "spout-task"); + common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(initList)); - GlobalStreamId ack_ack = - new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID); + GlobalStreamId ack_ack = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID); common.put_to_inputs(ack_ack, Thrift.mkDirectGrouping()); - GlobalStreamId ack_fail = - new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID); + GlobalStreamId ack_fail = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID); common.put_to_inputs(ack_fail, Thrift.mkDirectGrouping()); } @@ -480,26 +535,21 @@ public class Common { public static StormTopology add_system_components(StormTopology topology) { // generate inputs - Map<GlobalStreamId, Grouping> inputs = - new HashMap<GlobalStreamId, Grouping>(); + Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); // generate outputs HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>(); ArrayList<String> fields = new ArrayList<String>(); - outputs.put(Constants.SYSTEM_TICK_STREAM_ID, - Thrift.outputFields(JStormUtils.mk_list("rate_secs"))); - outputs.put(Constants.METRICS_TICK_STREAM_ID, - Thrift.outputFields(JStormUtils.mk_list("interval"))); - outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, - Thrift.outputFields(JStormUtils.mk_list("creds"))); + outputs.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("rate_secs"))); + outputs.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("interval"))); + outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("creds"))); // ComponentCommon common = new ComponentCommon(inputs, outputs); IBolt ackerbolt = new SystemBolt(); - Bolt bolt = - Thrift.mkBolt(inputs, ackerbolt, outputs, Integer.valueOf(0)); + Bolt bolt = Thrift.mkBolt(inputs, ackerbolt, outputs, Integer.valueOf(0)); topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, bolt); @@ -539,13 +589,15 @@ public class Common { } @SuppressWarnings("rawtypes") - public static StormTopology system_topology(Map storm_conf, - StormTopology topology) throws InvalidTopologyException { + public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException { StormTopology ret = topology.deepCopy(); add_acker(storm_conf, ret); + if(StormConfig.local_mode(storm_conf) == false) + addTopologyMaster(storm_conf, ret); + add_metrics_component(ret); add_system_components(ret); @@ -562,8 +614,7 @@ public class Common { * @return */ @SuppressWarnings("unchecked") - public static Map component_conf(Map storm_conf, - TopologyContext topology_context, String component_id) { + public static Map component_conf(Map storm_conf, TopologyContext topology_context, String component_id) { List<Object> to_remove = StormConfig.All_CONFIGS(); to_remove.remove(Config.TOPOLOGY_DEBUG); to_remove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING); @@ -572,16 +623,13 @@ public class Common { Map<Object, Object> componentConf = new HashMap<Object, Object>(); - String jconf = - topology_context.getComponentCommon(component_id) - .get_json_conf(); + String jconf = topology_context.getComponentCommon(component_id).get_json_conf(); if (jconf != null) { componentConf = (Map<Object, Object>) JStormUtils.from_json(jconf); } /** - * @@@ Don't know why need remove system configuration from component - * conf? // + * @@@ Don't know why need remove system configuration from component conf? // */ // for (Object p : to_remove) { // componentConf.remove(p); @@ -601,8 +649,7 @@ public class Common { * @param component_id * @return */ - public static Object get_task_object(StormTopology topology, - String component_id, URLClassLoader loader) { + public static Object get_task_object(StormTopology topology, String component_id, URLClassLoader loader) { Map<String, SpoutSpec> spouts = topology.get_spouts(); Map<String, Bolt> bolts = topology.get_bolts(); Map<String, StateSpoutSpec> state_spouts = topology.get_state_spouts(); @@ -617,8 +664,7 @@ public class Common { } if (obj == null) { - throw new RuntimeException("Could not find " + component_id - + " in " + topology.toString()); + throw new RuntimeException("Could not find " + component_id + " in " + topology.toString()); } Object componentObject = Utils.getSetComponentObject(obj, loader); @@ -646,43 +692,34 @@ public class Common { * @param topology_context * @return */ - public static Map<String, Map<String, MkGrouper>> outbound_components( - TopologyContext topology_context, WorkerData workerData) { - Map<String, Map<String, MkGrouper>> rr = - new HashMap<String, Map<String, MkGrouper>>(); + public static Map<String, Map<String, MkGrouper>> outbound_components(TopologyContext topology_context, WorkerData workerData) { + Map<String, Map<String, MkGrouper>> rr = new HashMap<String, Map<String, MkGrouper>>(); // <Stream_id,<component,Grouping>> - Map<String, Map<String, Grouping>> output_groupings = - topology_context.getThisTargets(); + Map<String, Map<String, Grouping>> output_groupings = topology_context.getThisTargets(); - for (Entry<String, Map<String, Grouping>> entry : output_groupings - .entrySet()) { + for (Entry<String, Map<String, Grouping>> entry : output_groupings.entrySet()) { String stream_id = entry.getKey(); Map<String, Grouping> component_grouping = entry.getValue(); Fields out_fields = topology_context.getThisOutputFields(stream_id); - Map<String, MkGrouper> componentGrouper = - new HashMap<String, MkGrouper>(); + Map<String, MkGrouper> componentGrouper = new HashMap<String, MkGrouper>(); for (Entry<String, Grouping> cg : component_grouping.entrySet()) { String component = cg.getKey(); Grouping tgrouping = cg.getValue(); - List<Integer> outTasks = - topology_context.getComponentTasks(component); + List<Integer> outTasks = topology_context.getComponentTasks(component); // ATTENTION: If topology set one component parallelism as 0 // so we don't need send tuple to it if (outTasks.size() > 0) { - MkGrouper grouper = - new MkGrouper(topology_context, out_fields, - tgrouping, outTasks, stream_id, workerData); + MkGrouper grouper = new MkGrouper(topology_context, out_fields, tgrouping, outTasks, stream_id, workerData); componentGrouper.put(component, grouper); } - LOG.info("outbound_components, outTasks=" + outTasks - + " for task-" + topology_context.getThisTaskId()); + LOG.info("outbound_components, outTasks=" + outTasks + " for task-" + topology_context.getThisTaskId()); } if (componentGrouper.size() > 0) { rr.put(stream_id, componentGrouper); @@ -696,17 +733,12 @@ public class Common { * * @param topology_context * @param task_id - * @return component's configurations */ - public static Map getComponentMap(DefaultTopologyAssignContext context, - Integer task) { + public static Map getComponentMap(DefaultTopologyAssignContext context, Integer task) { String componentName = context.getTaskToComponent().get(task); - ComponentCommon componentCommon = - ThriftTopologyUtils.getComponentCommon( - context.getSysTopology(), componentName); + ComponentCommon componentCommon = ThriftTopologyUtils.getComponentCommon(context.getSysTopology(), componentName); - Map componentMap = - (Map) JStormUtils.from_json(componentCommon.get_json_conf()); + Map componentMap = (Map) JStormUtils.from_json(componentCommon.get_json_conf()); if (componentMap == null) { componentMap = Maps.newHashMap(); } @@ -714,22 +746,17 @@ public class Common { } /** - * get all bolts' inputs and spouts' outputs <Bolt_name, <Input_name>> - * <Spout_name, <Output_name>> + * get all bolts' inputs and spouts' outputs <Bolt_name, <Input_name>> <Spout_name, <Output_name>> * * @param topology_context * @return all bolts' inputs and spouts' outputs */ - public static Map<String, Set<String>> buildSpoutOutoputAndBoltInputMap( - DefaultTopologyAssignContext context) { + public static Map<String, Set<String>> buildSpoutOutoputAndBoltInputMap(DefaultTopologyAssignContext context) { Set<String> bolts = context.getRawTopology().get_bolts().keySet(); Set<String> spouts = context.getRawTopology().get_spouts().keySet(); - Map<String, Set<String>> relationship = - new HashMap<String, Set<String>>(); - for (Entry<String, Bolt> entry : context.getRawTopology().get_bolts() - .entrySet()) { - Map<GlobalStreamId, Grouping> inputs = - entry.getValue().get_common().get_inputs(); + Map<String, Set<String>> relationship = new HashMap<String, Set<String>>(); + for (Entry<String, Bolt> entry : context.getRawTopology().get_bolts().entrySet()) { + Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs(); Set<String> input = new HashSet<String>(); relationship.put(entry.getKey(), input); for (Entry<GlobalStreamId, Grouping> inEntry : inputs.entrySet()) { @@ -759,37 +786,34 @@ public class Common { public static Map<Integer, String> getTaskToComponent(Map<Integer, TaskInfo> taskInfoMap) { Map<Integer, String> ret = new TreeMap<Integer, String>(); - for (Entry<Integer, TaskInfo> entry :taskInfoMap.entrySet()) { + for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) { ret.put(entry.getKey(), entry.getValue().getComponentId()); } - + return ret; } - + public static Map<Integer, String> getTaskToType(Map<Integer, TaskInfo> taskInfoMap) { Map<Integer, String> ret = new TreeMap<Integer, String>(); - for (Entry<Integer, TaskInfo> entry :taskInfoMap.entrySet()) { + for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) { ret.put(entry.getKey(), entry.getValue().getComponentType()); } - + return ret; } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public static Integer mkTaskMaker(Map<Object, Object> stormConf, - Map<String, ?> cidSpec, - Map<Integer, TaskInfo> rtn, - Integer cnt) { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Integer mkTaskMaker(Map<Object, Object> stormConf, Map<String, ?> cidSpec, Map<Integer, TaskInfo> rtn, Integer cnt) { if (cidSpec == null) { LOG.warn("Component map is empty"); return cnt; } - + Set<?> entrySet = cidSpec.entrySet(); for (Iterator<?> it = entrySet.iterator(); it.hasNext();) { Entry entry = (Entry) it.next(); Object obj = entry.getValue(); - + ComponentCommon common = null; String componentType = "bolt"; if (obj instanceof Bolt) { @@ -802,22 +826,22 @@ public class Common { common = ((StateSpoutSpec) obj).get_common(); componentType = "spout"; } - + if (common == null) { throw new RuntimeException("No ComponentCommon of " + entry.getKey()); } - + int declared = Thrift.parallelismHint(common); Integer parallelism = declared; // Map tmp = (Map) Utils_clj.from_json(common.get_json_conf()); - + Map newStormConf = new HashMap(stormConf); // newStormConf.putAll(tmp); Integer maxParallelism = JStormUtils.parseInt(newStormConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); if (maxParallelism != null) { parallelism = Math.min(maxParallelism, declared); } - + for (int i = 0; i < parallelism; i++) { cnt++; TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), componentType); @@ -826,20 +850,24 @@ public class Common { } return cnt; } - - public static Map<Integer, TaskInfo> mkTaskInfo( - Map<Object, Object> stormConf, - StormTopology sysTopology, - String topologyid) { - + + public static Map<Integer, TaskInfo> mkTaskInfo(Map<Object, Object> stormConf, StormTopology sysTopology, String topologyid) { + // use TreeMap to make task as sequence Map<Integer, TaskInfo> rtn = new TreeMap<Integer, TaskInfo>(); - + Integer count = 0; count = mkTaskMaker(stormConf, sysTopology.get_bolts(), rtn, count); count = mkTaskMaker(stormConf, sysTopology.get_spouts(), rtn, count); count = mkTaskMaker(stormConf, sysTopology.get_state_spouts(), rtn, count); - + return rtn; } + + public static boolean isSystemComponent(String componentId) { + if (componentId.equals(Acker.ACKER_COMPONENT_ID) || componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java index 2ebce83..3d25a25 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java @@ -49,8 +49,7 @@ import com.alibaba.jstorm.zk.Zookeeper; */ public class DistributedClusterState implements ClusterState { - private static Logger LOG = LoggerFactory - .getLogger(DistributedClusterState.class); + private static Logger LOG = LoggerFactory.getLogger(DistributedClusterState.class); private Zookeeper zkobj = new Zookeeper(); private CuratorFramework zk; @@ -59,8 +58,7 @@ public class DistributedClusterState implements ClusterState { /** * why run all callbacks, when receive one event */ - private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks = - new ConcurrentHashMap<UUID, ClusterStateCallback>(); + private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks = new ConcurrentHashMap<UUID, ClusterStateCallback>(); private Map<Object, Object> conf; private AtomicBoolean active; @@ -83,16 +81,13 @@ public class DistributedClusterState implements ClusterState { public void execute(KeeperState state, EventType type, String path) { if (active.get()) { if (!(state.equals(KeeperState.SyncConnected))) { - LOG.warn("Received event " + state + ":" + type + ":" - + path + " with disconnected Zookeeper."); + LOG.warn("Received event " + state + ":" + type + ":" + path + " with disconnected Zookeeper."); } else { - LOG.info("Received event " + state + ":" + type + ":" - + path); + LOG.info("Received event " + state + ":" + type + ":" + path); } if (!type.equals(EventType.None)) { - for (Entry<UUID, ClusterStateCallback> e : callbacks - .entrySet()) { + for (Entry<UUID, ClusterStateCallback> e : callbacks.entrySet()) { ClusterStateCallback fn = e.getValue(); fn.execute(type, path); } @@ -107,17 +102,12 @@ public class DistributedClusterState implements ClusterState { @SuppressWarnings("unchecked") private CuratorFramework mkZk() throws IOException { - return zkobj.mkClient(conf, - (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), - conf.get(Config.STORM_ZOOKEEPER_PORT), ""); + return zkobj.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), ""); } @SuppressWarnings("unchecked") - private CuratorFramework mkZk(WatcherCallBack watcher) - throws NumberFormatException, IOException { - return zkobj.mkClient(conf, - (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), - conf.get(Config.STORM_ZOOKEEPER_PORT), + private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException { + return zkobj.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher); } @@ -136,8 +126,7 @@ public class DistributedClusterState implements ClusterState { } @Override - public List<String> get_children(String path, boolean watch) - throws Exception { + public List<String> get_children(String path, boolean watch) throws Exception { return zkobj.getChildren(zk, path, watch); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java index e6438dd..6923ab5 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java @@ -37,8 +37,7 @@ public class StormBase implements Serializable { private boolean enableMonitor = true; private String group; - public StormBase(String stormName, int lanchTimeSecs, StormStatus status, - String group) { + public StormBase(String stormName, int lanchTimeSecs, StormStatus status, String group) { this.stormName = stormName; this.lanchTimeSecs = lanchTimeSecs; this.status = status; @@ -98,9 +97,7 @@ public class StormBase implements Serializable { result = prime * result + ((group == null) ? 0 : group.hashCode()); result = prime * result + lanchTimeSecs; result = prime * result + ((status == null) ? 0 : status.hashCode()); - result = - prime * result - + ((stormName == null) ? 0 : stormName.hashCode()); + result = prime * result + ((stormName == null) ? 0 : stormName.hashCode()); return result; } @@ -137,8 +134,7 @@ public class StormBase implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java index 6486d5e..a399bb9 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java @@ -27,7 +27,10 @@ import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.AssignmentBak; import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.error.TaskError; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; +import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo; +import com.alibaba.jstorm.utils.Pair; + +import backtype.storm.generated.TopologyTaskHbInfo; /** * all storm in zk operation interface @@ -41,30 +44,23 @@ public interface StormClusterState { public List<String> assignments(RunnableCallback callback) throws Exception; - public Assignment assignment_info(String topology_id, - RunnableCallback callback) throws Exception; + public Assignment assignment_info(String topology_id, RunnableCallback callback) throws Exception; - public void set_assignment(String topology_id, Assignment info) - throws Exception; + public void set_assignment(String topology_id, Assignment info) throws Exception; public AssignmentBak assignment_bak(String topologyName) throws Exception; - public void backup_assignment(String topology_id, AssignmentBak info) - throws Exception; + public void backup_assignment(String topology_id, AssignmentBak info) throws Exception; public List<String> active_storms() throws Exception; - public StormBase storm_base(String topology_id, RunnableCallback callback) - throws Exception; + public StormBase storm_base(String topology_id, RunnableCallback callback) throws Exception; - public void activate_storm(String topology_id, StormBase storm_base) - throws Exception; + public void activate_storm(String topology_id, StormBase storm_base) throws Exception; - public void update_storm(String topology_id, StormStatus new_elems) - throws Exception; + public void update_storm(String topology_id, StormStatus new_elems) throws Exception; - public void set_storm_monitor(String topologyId, boolean isEnable) - throws Exception; + public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception; public void remove_storm_base(String topology_id) throws Exception; @@ -72,73 +68,53 @@ public interface StormClusterState { public Set<Integer> task_ids(String topology_id) throws Exception; - public Set<Integer> task_ids_by_componentId(String topologyId, - String componentId) throws Exception; + public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception; public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception; - public void add_task(String topology_id, Map<Integer, TaskInfo> taskInfoMap) - throws Exception; + + public void add_task(String topology_id, Map<Integer, TaskInfo> taskInfoMap) throws Exception; public void remove_task(String topologyId, Set<Integer> taskIds) throws Exception; public Map<Integer, TaskInfo> task_all_info(String topology_id) throws Exception; - public void setup_heartbeats(String topology_id) throws Exception; - public List<String> heartbeat_storms() throws Exception; - public List<String> heartbeat_tasks(String topology_id) throws Exception; - - public TaskHeartbeat task_heartbeat(String topology_id, int task_id) - throws Exception; + public void topology_heartbeat(String topology_id, TopologyTaskHbInfo info) throws Exception; - public void task_heartbeat(String topology_id, int task_id, - TaskHeartbeat info) throws Exception; + public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception; public void teardown_heartbeats(String topology_id) throws Exception; - public void remove_task_heartbeat(String topology_id, int task_id) - throws Exception; - public List<String> task_error_storms() throws Exception; - + public List<String> task_error_ids(String topologyId) throws Exception; - public void report_task_error(String topology_id, int task_id, - Throwable error) throws Exception; + public void report_task_error(String topology_id, int task_id, Throwable error) throws Exception; - public void report_task_error(String topology_id, int task_id, String error) - throws Exception; + public void report_task_error(String topology_id, int task_id, String error, String tag) throws Exception; - public Map<Integer, String> topo_lastErr_time(String topologyId) - throws Exception; + public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception; public void remove_lastErr_time(String topologyId) throws Exception; - public List<TaskError> task_errors(String topology_id, int task_id) - throws Exception; + public List<TaskError> task_errors(String topology_id, int task_id) throws Exception; - public void remove_task_error(String topologyId, int taskId) - throws Exception; + public void remove_task_error(String topologyId, int taskId) throws Exception; - public List<String> task_error_time(String topologyId, int taskId) - throws Exception; + public List<String> task_error_time(String topologyId, int taskId) throws Exception; - public String task_error_info(String topologyId, int taskId, long timeStamp) - throws Exception; + public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception; public void teardown_task_errors(String topology_id) throws Exception; public List<String> supervisors(RunnableCallback callback) throws Exception; - public SupervisorInfo supervisor_info(String supervisor_id) - throws Exception; + public SupervisorInfo supervisor_info(String supervisor_id) throws Exception; - public void supervisor_heartbeat(String supervisor_id, SupervisorInfo info) - throws Exception; + public void supervisor_heartbeat(String supervisor_id, SupervisorInfo info) throws Exception; - public boolean try_to_be_leader(String path, String host, - RunnableCallback callback) throws Exception; + public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception; public String get_leader_host() throws Exception; @@ -152,11 +128,25 @@ public interface StormClusterState { public void unregister_nimbus_host(String host) throws Exception; - public void set_topology_metric(String topologyId, Object metric) - throws Exception; + public void update_nimbus_detail(String hostPort, Map map) throws Exception; + + public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception; + + public void unregister_nimbus_detail(String hostPort) throws Exception; + + public void set_topology_metric(String topologyId, Object metric) throws Exception; public Object get_topology_metric(String topologyId) throws Exception; - + public List<String> get_metrics() throws Exception; + public List<String> list_dirs(String path, boolean watch) throws Exception; + + public List<String> backpressureInfos() throws Exception; + + public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception; + + public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception; + + public void teardown_backpressure(String topologyId) throws Exception; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java index 3d1cd29..f78f52a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java @@ -17,8 +17,18 @@ */ package com.alibaba.jstorm.cluster; +import backtype.storm.Config; +import backtype.storm.generated.StormTopology; +import backtype.storm.utils.LocalState; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.PathUtils; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; @@ -26,21 +36,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.generated.StormTopology; -import backtype.storm.utils.LocalState; -import backtype.storm.utils.Utils; - -import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.PathUtils; - public class StormConfig { - private final static Logger LOG = LoggerFactory - .getLogger(StormConfig.class); + private final static Logger LOG = LoggerFactory.getLogger(StormConfig.class); public final static String RESOURCES_SUBDIR = "resources"; public final static String WORKER_DATA_SUBDIR = "worker_shared_data"; @@ -80,11 +77,10 @@ public class StormConfig { return rtn; } - public static HashMap<String, Object> getClassFields(Class<?> cls) - throws IllegalArgumentException, IllegalAccessException { - java.lang.reflect.Field[] list = cls.getDeclaredFields(); + public static HashMap<String, Object> getClassFields(Class<?> cls) throws IllegalArgumentException, IllegalAccessException { + Field[] list = cls.getDeclaredFields(); HashMap<String, Object> rtn = new HashMap<String, Object>(); - for (java.lang.reflect.Field f : list) { + for (Field f : list) { String name = f.getName(); rtn.put(name, f.get(null).toString()); @@ -98,19 +94,26 @@ public class StormConfig { } + /** + * please use ConfigExtension.getClusterName(Map conf) + */ + @Deprecated + public static String cluster_name(Map conf) { + return ConfigExtension.getClusterName(conf); + } + public static boolean local_mode(Map conf) { String mode = (String) conf.get(Config.STORM_CLUSTER_MODE); if (mode != null) { - if (mode.equals("local")) { + if ("local".equals(mode)) { return true; } - if (mode.equals("distributed")) { + if ("distributed".equals(mode)) { return false; } } - throw new IllegalArgumentException("Illegal cluster mode in conf:" - + mode); + throw new IllegalArgumentException("Illegal cluster mode in conf:" + mode); } @@ -121,24 +124,20 @@ public class StormConfig { */ public static void validate_distributed_mode(Map<?, ?> conf) { if (StormConfig.local_mode(conf)) { - throw new IllegalArgumentException( - "Cannot start server in local mode!"); + throw new IllegalArgumentException("Cannot start server in local mode!"); } } public static void validate_local_mode(Map<?, ?> conf) { if (!StormConfig.local_mode(conf)) { - throw new IllegalArgumentException( - "Cannot start server in distributed mode!"); + throw new IllegalArgumentException("Cannot start server in distributed mode!"); } } public static String worker_root(Map conf) throws IOException { - String ret = - String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) - + FILE_SEPERATEOR + "workers"; + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "workers"; FileUtils.forceMkdir(new File(ret)); return ret; } @@ -149,39 +148,38 @@ public class StormConfig { return ret; } - public static String worker_pids_root(Map conf, String id) - throws IOException { + public static String worker_pids_root(Map conf, String id) throws IOException { String ret = worker_root(conf, id) + FILE_SEPERATEOR + "pids"; FileUtils.forceMkdir(new File(ret)); return ret; } - public static String worker_pid_path(Map conf, String id, String pid) - throws IOException { + public static String worker_pid_path(Map conf, String id, String pid) throws IOException { String ret = worker_pids_root(conf, id) + FILE_SEPERATEOR + pid; return ret; } - public static String worker_heartbeats_root(Map conf, String id) - throws IOException { + public static String worker_heartbeats_root(Map conf, String id) throws IOException { String ret = worker_root(conf, id) + FILE_SEPERATEOR + "heartbeats"; FileUtils.forceMkdir(new File(ret)); return ret; } public static String default_worker_shared_dir(Map conf) throws IOException { - String ret = - String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) - + FILE_SEPERATEOR + WORKER_DATA_SUBDIR; + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + WORKER_DATA_SUBDIR; FileUtils.forceMkdir(new File(ret)); return ret; } + private static String drpc_local_dir(Map conf) throws IOException { + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "drpc"; + FileUtils.forceMkdir(new File(ret)); + return ret; + } + private static String supervisor_local_dir(Map conf) throws IOException { - String ret = - String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) - + FILE_SEPERATEOR + "supervisor"; + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "supervisor"; FileUtils.forceMkdir(new File(ret)); return ret; } @@ -192,8 +190,7 @@ public class StormConfig { return ret; } - public static String supervisor_stormdist_root(Map conf, String topologyId) - throws IOException { + public static String supervisor_stormdist_root(Map conf, String topologyId) throws IOException { return supervisor_stormdist_root(conf) + FILE_SEPERATEOR + topologyId; } @@ -216,17 +213,32 @@ public class StormConfig { } /** + * Return drpc's pid dir + * + * @param conf + * @return + * @throws IOException + */ + public static String drpcPids(Map conf) throws IOException { + String ret = drpc_local_dir(conf) + FILE_SEPERATEOR + "pids"; + try { + FileUtils.forceMkdir(new File(ret)); + } catch (IOException e) { + LOG.error("Failed to create dir " + ret, e); + throw e; + } + return ret; + } + + /** * Return nimbus's heartbeat dir for apsara * * @param conf * @return * @throws IOException */ - public static String supervisorHearbeatForContainer(Map conf) - throws IOException { - String ret = - supervisor_local_dir(conf) + FILE_SEPERATEOR - + "supervisor.heartbeat"; + public static String supervisorHearbeatForContainer(Map conf) throws IOException { + String ret = supervisor_local_dir(conf) + FILE_SEPERATEOR + "supervisor.heartbeat"; try { FileUtils.forceMkdir(new File(ret)); } catch (IOException e) { @@ -272,8 +284,7 @@ public class StormConfig { return stormroot + FILE_SEPERATEOR + "timestamp"; } - public static LocalState worker_state(Map conf, String id) - throws IOException { + public static LocalState worker_state(Map conf, String id) throws IOException { String path = worker_heartbeats_root(conf, id); LocalState rtn = new LocalState(path); @@ -282,9 +293,18 @@ public class StormConfig { } public static String masterLocalDir(Map conf) throws IOException { - String ret = - String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) - + FILE_SEPERATEOR + "nimbus"; + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "nimbus"; + try { + FileUtils.forceMkdir(new File(ret)); + } catch (IOException e) { + LOG.error("Failed to create dir " + ret, e); + throw e; + } + return ret; + } + + public static String metricLocalDir(Map conf) throws IOException { + String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "metrics"; try { FileUtils.forceMkdir(new File(ret)); } catch (IOException e) { @@ -300,8 +320,7 @@ public class StormConfig { return ret; } - public static String masterStormdistRoot(Map conf, String topologyId) - throws IOException { + public static String masterStormdistRoot(Map conf, String topologyId) throws IOException { return masterStormdistRoot(conf) + FILE_SEPERATEOR + topologyId; } @@ -311,8 +330,7 @@ public class StormConfig { return ret; } - public static String masterStormTmpRoot(Map conf, String topologyId) - throws IOException { + public static String masterStormTmpRoot(Map conf, String topologyId) throws IOException { return masterStormTmpRoot(conf) + FILE_SEPERATEOR + topologyId; } @@ -363,10 +381,8 @@ public class StormConfig { * @return * @throws IOException */ - public static String masterHearbeatForContainer(Map conf) - throws IOException { - String ret = - masterLocalDir(conf) + FILE_SEPERATEOR + "nimbus.heartbeat"; + public static String masterHearbeatForContainer(Map conf) throws IOException { + String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "nimbus.heartbeat"; try { FileUtils.forceMkdir(new File(ret)); } catch (IOException e) { @@ -375,11 +391,15 @@ public class StormConfig { } return ret; } - + public static String masterDbDir(Map conf) throws IOException { return masterLocalDir(conf) + FILE_SEPERATEOR + "rocksdb"; } + public static String metricDbDir(Map conf) throws IOException { + return metricLocalDir(conf) + FILE_SEPERATEOR + "rocksdb"; + } + public static String supervisorTmpDir(Map conf) throws IOException { String ret = null; try { @@ -397,8 +417,7 @@ public class StormConfig { public static LocalState supervisorState(Map conf) throws IOException { LocalState localState = null; try { - String localstateDir = - supervisor_local_dir(conf) + FILE_SEPERATEOR + "localstate"; + String localstateDir = supervisor_local_dir(conf) + FILE_SEPERATEOR + "localstate"; FileUtils.forceMkdir(new File(localstateDir)); localState = new LocalState(localstateDir); } catch (IOException e) { @@ -416,25 +435,20 @@ public class StormConfig { * @return * @throws IOException */ - public static Map read_supervisor_topology_conf(Map conf, String topologyId) - throws IOException { - String topologyRoot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + public static Map read_supervisor_topology_conf(Map conf, String topologyId) throws IOException { + String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId); String confPath = StormConfig.stormconf_path(topologyRoot); return (Map) readLocalObject(topologyId, confPath); } - public static StormTopology read_supervisor_topology_code(Map conf, - String topologyId) throws IOException { - String topologyRoot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + public static StormTopology read_supervisor_topology_code(Map conf, String topologyId) throws IOException { + String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId); String codePath = StormConfig.stormcode_path(topologyRoot); return (StormTopology) readLocalObject(topologyId, codePath); } @SuppressWarnings("rawtypes") - public static List<String> get_supervisor_toplogy_list(Map conf) - throws IOException { + public static List<String> get_supervisor_toplogy_list(Map conf) throws IOException { // get the path: STORM-LOCAL-DIR/supervisor/stormdist/ String path = StormConfig.supervisor_stormdist_root(conf); @@ -444,48 +458,40 @@ public class StormConfig { return topologyids; } - public static Map read_nimbus_topology_conf(Map conf, String topologyId) - throws IOException { + public static Map read_nimbus_topology_conf(Map conf, String topologyId) throws IOException { String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId); return read_topology_conf(topologyRoot, topologyId); } - public static void write_nimbus_topology_conf(Map conf, String topologyId, - Map topoConf) throws IOException { + public static void write_nimbus_topology_conf(Map conf, String topologyId, Map topoConf) throws IOException { String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId); String confPath = StormConfig.stormconf_path(topologyRoot); - FileUtils.writeByteArrayToFile(new File(confPath), - Utils.serialize(topoConf)); + FileUtils.writeByteArrayToFile(new File(confPath), Utils.serialize(topoConf)); } - public static Map read_nimbusTmp_topology_conf(Map conf, String topologyId) - throws IOException { + public static Map read_nimbusTmp_topology_conf(Map conf, String topologyId) throws IOException { String topologyRoot = StormConfig.masterStormTmpRoot(conf, topologyId); return read_topology_conf(topologyRoot, topologyId); } - public static Map read_topology_conf(String topologyRoot, String topologyId) - throws IOException { + public static Map read_topology_conf(String topologyRoot, String topologyId) throws IOException { String readFile = StormConfig.stormconf_path(topologyRoot); return (Map) readLocalObject(topologyId, readFile); } - public static StormTopology read_nimbus_topology_code(Map conf, - String topologyId) throws IOException { + public static StormTopology read_nimbus_topology_code(Map conf, String topologyId) throws IOException { String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId); String codePath = StormConfig.stormcode_path(topologyRoot); return (StormTopology) readLocalObject(topologyId, codePath); } - public static void write_nimbus_topology_code(Map conf, String topologyId, - byte[] data) throws IOException { + public static void write_nimbus_topology_code(Map conf, String topologyId, byte[] data) throws IOException { String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId); String codePath = StormConfig.stormcode_path(topologyRoot); FileUtils.writeByteArrayToFile(new File(codePath), data); } - public static long read_supervisor_topology_timestamp(Map conf, - String topologyId) throws IOException { + public static long read_supervisor_topology_timestamp(Map conf, String topologyId) throws IOException { String stormRoot = supervisor_stormdist_root(conf, topologyId); String timeStampPath = stormts_path(stormRoot); @@ -493,8 +499,7 @@ public class StormConfig { return JStormUtils.bytesToLong(data); } - public static void write_supervisor_topology_timestamp(Map conf, - String topologyId, long timeStamp) throws IOException { + public static void write_supervisor_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException { String stormRoot = supervisor_stormdist_root(conf, topologyId); String timeStampPath = stormts_path(stormRoot); @@ -502,6 +507,22 @@ public class StormConfig { FileUtils.writeByteArrayToFile(new File(timeStampPath), data); } + public static long read_nimbus_topology_timestamp(Map conf, String topologyId) throws IOException { + String stormRoot = masterStormdistRoot(conf, topologyId); + String timeStampPath = stormts_path(stormRoot); + + byte[] data = FileUtils.readFileToByteArray(new File(timeStampPath)); + return JStormUtils.bytesToLong(data); + } + + public static void write_nimbus_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException { + String stormRoot = masterStormdistRoot(conf, topologyId); + String timeStampPath = stormts_path(stormRoot); + + byte[] data = JStormUtils.longToBytes(timeStamp); + FileUtils.writeByteArrayToFile(new File(timeStampPath), data); + } + /** * stormconf has mergered into clusterconf * @@ -511,12 +532,9 @@ public class StormConfig { * @throws IOException */ @SuppressWarnings("unchecked") - public static Object readLocalObject(String topologyId, String readFile) - throws IOException { + public static Object readLocalObject(String topologyId, String readFile) throws IOException { - String errMsg = - "Failed to get topology configuration of " + topologyId - + " file:" + readFile; + String errMsg = "Failed to get topology configuration of " + topologyId + " file:" + readFile; byte[] bconf = FileUtils.readFileToByteArray(new File(readFile)); if (bconf == null) { @@ -537,10 +555,8 @@ public class StormConfig { return ret; } - public static long get_supervisor_topology_Bianrymodify_time(Map conf, - String topologyId) throws IOException { - String topologyRoot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + public static long get_supervisor_topology_Bianrymodify_time(Map conf, String topologyId) throws IOException { + String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId); File f = new File(topologyRoot); long modifyTime = f.lastModified(); return modifyTime; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java index 935a638..c92b362 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java @@ -44,7 +44,6 @@ public class StormMonitor implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } \ No newline at end of file
