http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java index 5ad70cb..c2e07ee 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java @@ -29,10 +29,8 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType; * * Dedicate Topology status * - * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status - * isn't killed, it is -1 and useless. when status is killed, do kill operation - * after killTimeSecs seconds when status is rebalancing, do rebalancing opation - * after delaySecs seconds restore oldStatus as current status + * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status isn't killed, it is -1 and useless. when status is killed, do kill operation + * after killTimeSecs seconds when status is rebalancing, do rebalancing opation after delaySecs seconds restore oldStatus as current status */ public class StormStatus implements Serializable { @@ -99,9 +97,7 @@ public class StormStatus implements Serializable { } StormStatus check = (StormStatus) base; - if (check.getStatusType().equals(getStatusType()) - && check.getKillTimeSecs() == getKillTimeSecs() - && check.getDelaySecs().equals(getDelaySecs())) { + if (check.getStatusType().equals(getStatusType()) && check.getKillTimeSecs() == getKillTimeSecs() && check.getDelaySecs().equals(getDelaySecs())) { return true; } return false; @@ -109,15 +105,12 @@ public class StormStatus implements Serializable { @Override public int hashCode() { - return this.getStatusType().hashCode() - + this.getKillTimeSecs().hashCode() - + this.getDelaySecs().hashCode(); + return this.getStatusType().hashCode() + this.getKillTimeSecs().hashCode() + this.getDelaySecs().hashCode(); } @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java index bd60d45..1550c7e 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java @@ -17,18 +17,8 @@ */ package com.alibaba.jstorm.cluster; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import backtype.storm.generated.TopologyTaskHbInfo; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.cache.JStormCache; import com.alibaba.jstorm.callback.ClusterStateCallback; import com.alibaba.jstorm.callback.RunnableCallback; @@ -38,14 +28,22 @@ import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.AssignmentBak; import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.error.TaskError; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; +import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; public class StormZkClusterState implements StormClusterState { - private static Logger LOG = LoggerFactory - .getLogger(StormZkClusterState.class); + private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class); private ClusterState cluster_state; @@ -67,12 +65,10 @@ public class StormZkClusterState implements StormClusterState { } else { solo = true; - cluster_state = - new DistributedClusterState((Map) cluster_state_spec); + cluster_state = new DistributedClusterState((Map) cluster_state_spec); } - assignment_info_callback = - new ConcurrentHashMap<String, RunnableCallback>(); + assignment_info_callback = new ConcurrentHashMap<String, RunnableCallback>(); supervisors_callback = new AtomicReference<RunnableCallback>(null); assignments_callback = new AtomicReference<RunnableCallback>(null); storm_base_callback = new ConcurrentHashMap<String, RunnableCallback>(); @@ -85,8 +81,7 @@ public class StormZkClusterState implements StormClusterState { LOG.warn("Input args is null"); return null; } else if (args.length < 2) { - LOG.warn("Input args is invalid, args length:" - + args.length); + LOG.warn("Input args is invalid, args length:" + args.length); return null; } @@ -132,11 +127,8 @@ public class StormZkClusterState implements StormClusterState { }); String[] pathlist = - JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE, - Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE, - Cluster.ASSIGNMENTS_BAK_SUBTREE, Cluster.TASKS_SUBTREE, - Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE, - Cluster.METRIC_SUBTREE); + JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE, Cluster.ASSIGNMENTS_BAK_SUBTREE, + Cluster.TASKS_SUBTREE, Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE, Cluster.METRIC_SUBTREE, Cluster.BACKPRESSURE_SUBTREE); for (String path : pathlist) { cluster_state.mkdirs(path); } @@ -146,8 +138,7 @@ public class StormZkClusterState implements StormClusterState { /** * @@@ TODO * - * Just add cache in lower ZK level In fact, for some Object - * Assignment/TaskInfo/StormBase These object can be cache for long time + * Just add cache in lower ZK level In fact, for some Object Assignment/TaskInfo/StormBase These object can be cache for long time * * @param simpleCache */ @@ -221,10 +212,10 @@ public class StormZkClusterState implements StormClusterState { deleteObject(Cluster.storm_task_root(topologyId)); teardown_heartbeats(topologyId); teardown_task_errors(topologyId); + teardown_backpressure(topologyId); deleteObject(Cluster.metric_path(topologyId)); } catch (Exception e) { - LOG.warn("Failed to delete task root and monitor root for" - + topologyId); + LOG.warn("Failed to delete task root and monitor root for" + topologyId); } remove_storm_base(topologyId); } @@ -240,8 +231,7 @@ public class StormZkClusterState implements StormClusterState { } @Override - public Assignment assignment_info(String topologyId, - RunnableCallback callback) throws Exception { + public Assignment assignment_info(String topologyId, RunnableCallback callback) throws Exception { if (callback != null) { assignment_info_callback.put(topologyId, callback); } @@ -257,13 +247,11 @@ public class StormZkClusterState implements StormClusterState { if (callback != null) { assignments_callback.set(callback); } - return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE, - callback != null); + return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null); } @Override - public void set_assignment(String topologyId, Assignment info) - throws Exception { + public void set_assignment(String topologyId, Assignment info) throws Exception { setObject(Cluster.assignment_path(topologyId), info); } @@ -276,26 +264,22 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void backup_assignment(String topologyName, AssignmentBak info) - throws Exception { + public void backup_assignment(String topologyName, AssignmentBak info) throws Exception { setObject(Cluster.assignment_bak_path(topologyName), info); } @Override - public StormBase storm_base(String topologyId, RunnableCallback callback) - throws Exception { + public StormBase storm_base(String topologyId, RunnableCallback callback) throws Exception { if (callback != null) { storm_base_callback.put(topologyId, callback); } - return (StormBase) getObject(Cluster.storm_path(topologyId), - callback != null); + return (StormBase) getObject(Cluster.storm_path(topologyId), callback != null); } @Override - public void activate_storm(String topologyId, StormBase stormBase) - throws Exception { + public void activate_storm(String topologyId, StormBase stormBase) throws Exception { String stormPath = Cluster.storm_path(topologyId); setObject(stormPath, stormBase); @@ -307,8 +291,7 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void update_storm(String topologyId, StormStatus newElems) - throws Exception { + public void update_storm(String topologyId, StormStatus newElems) throws Exception { /** * FIXME, maybe overwrite old callback */ @@ -323,8 +306,7 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void set_storm_monitor(String topologyId, boolean isEnable) - throws Exception { + public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception { // TODO Auto-generated method stub StormBase base = this.storm_base(topologyId, null); @@ -340,30 +322,20 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void setup_heartbeats(String topologyId) throws Exception { - String taskbeatPath = Cluster.taskbeat_storm_root(topologyId); - - cluster_state.mkdirs(taskbeatPath); - } - - @Override - public List<String> heartbeat_storms() throws Exception { - return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false); + public void topology_heartbeat(String topologyId, TopologyTaskHbInfo info) throws Exception { + String taskPath = Cluster.taskbeat_storm_root(topologyId); + setObject(taskPath, info); } @Override - public List<String> heartbeat_tasks(String topologyId) throws Exception { - String taskbeatPath = Cluster.taskbeat_storm_root(topologyId); - - return cluster_state.get_children(taskbeatPath, false); + public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception { + String taskPath = Cluster.taskbeat_storm_root(topologyId); + return (TopologyTaskHbInfo) getObject(taskPath, false); } @Override - public void remove_task_heartbeat(String topologyId, int taskId) - throws Exception { - String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId); - - deleteObject(taskbeatPath); + public List<String> heartbeat_storms() throws Exception { + return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false); } @Override @@ -379,14 +351,11 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void report_task_error(String topologyId, int taskId, Throwable error) - throws Exception { - report_task_error(topologyId, taskId, - new String(JStormUtils.getErrorInfo(error))); + public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception { + report_task_error(topologyId, taskId, new String(JStormUtils.getErrorInfo(error)), null); } - public void report_task_error(String topologyId, int taskId, String error) - throws Exception { + public void report_task_error(String topologyId, int taskId, String error, String tag) throws Exception { boolean found = false; String path = Cluster.taskerror_path(topologyId, taskId); cluster_state.mkdirs(path); @@ -403,9 +372,10 @@ public class StormZkClusterState implements StormClusterState { deleteObject(errorPath); continue; } - if (errorInfo.equals(error)) { - deleteObject(errorPath); - setObject(timestampPath, error); + if (errorInfo.equals(error) + || (tag != null && errorInfo.startsWith(tag))) { + cluster_state.delete_node(errorPath); + cluster_state.set_data(timestampPath, error.getBytes()); found = true; break; } @@ -429,8 +399,7 @@ public class StormZkClusterState implements StormClusterState { private static final String TASK_IS_DEAD = "is dead on"; // Full string is // "task-id is dead on hostname:port" - private void setLastErrInfo(String topologyId, String error, - String timeStamp) throws Exception { + private void setLastErrInfo(String topologyId, String error, String timeStamp) throws Exception { // Set error information in task error topology patch // Last Error information format in ZK: map<report_duration, timestamp> // report_duration means only the errors will presented in web ui if the @@ -440,13 +409,10 @@ public class StormZkClusterState implements StormClusterState { String lastErrTopoPath = Cluster.lasterror_path(topologyId); Map<Integer, String> lastErrInfo = null; try { - lastErrInfo = - (Map<Integer, String>) getObject(lastErrTopoPath, false); + lastErrInfo = (Map<Integer, String>) getObject(lastErrTopoPath, false); } catch (Exception e) { - LOG.error( - "Failed to get last error time. Remove the corrupt node for " - + topologyId, e); + LOG.error("Failed to get last error time. Remove the corrupt node for " + topologyId, e); remove_lastErr_time(topologyId); lastErrInfo = null; } @@ -466,15 +432,13 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void remove_task_error(String topologyId, int taskId) - throws Exception { + public void remove_task_error(String topologyId, int taskId) throws Exception { String path = Cluster.taskerror_path(topologyId, taskId); cluster_state.delete_node(path); } @Override - public Map<Integer, String> topo_lastErr_time(String topologyId) - throws Exception { + public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception { String path = Cluster.lasterror_path(topologyId); return (Map<Integer, String>) getObject(path, false); @@ -490,17 +454,18 @@ public class StormZkClusterState implements StormClusterState { public List<String> task_error_storms() throws Exception { return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false); } - + @Override public List<String> task_error_ids(String topologyId) throws Exception { - return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false); + return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false); } @Override - public List<String> task_error_time(String topologyId, int taskId) - throws Exception { + public List<String> task_error_time(String topologyId, int taskId) throws Exception { String path = Cluster.taskerror_path(topologyId, taskId); - cluster_state.mkdirs(path); + if (cluster_state.node_existed(path, false) == false) { + return new ArrayList<String>(); + } return cluster_state.get_children(path, false); } @@ -509,38 +474,37 @@ public class StormZkClusterState implements StormClusterState { String tasksPath = Cluster.storm_task_root(topologyId); Object data = getObject(tasksPath, false); if (data != null) { - Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>)data); - for (Integer taskId : taskIds){ + Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>) data); + for (Integer taskId : taskIds) { taskInfoMap.remove(taskId); } - //update zk node of tasks + // update zk node of tasks setObject(tasksPath, taskInfoMap); } } @Override - public String task_error_info(String topologyId, int taskId, long timeStamp) - throws Exception { + public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception { String path = Cluster.taskerror_path(topologyId, taskId); - cluster_state.mkdirs(path); path = path + "/" + timeStamp; return getString(path, false); } @Override - public List<TaskError> task_errors(String topologyId, int taskId) - throws Exception { - String path = Cluster.taskerror_path(topologyId, taskId); - cluster_state.mkdirs(path); + public List<TaskError> task_errors(String topologyId, int taskId) throws Exception { + List<TaskError> errors = new ArrayList<TaskError>(); + String path = Cluster.taskerror_path(topologyId, taskId); + if (cluster_state.node_existed(path, false) == false) { + return errors; + } List<String> children = cluster_state.get_children(path, false); - List<TaskError> errors = new ArrayList<TaskError>(); + for (String str : children) { byte[] v = cluster_state.get_data(path + "/" + str, false); if (v != null) { - TaskError error = - new TaskError(new String(v), Integer.parseInt(str)); + TaskError error = new TaskError(new String(v), Integer.parseInt(str)); errors.add(error); } } @@ -572,45 +536,28 @@ public class StormZkClusterState implements StormClusterState { LOG.error("Could not teardown errors for " + topologyId, e); } } + @Override - public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) - throws Exception { + public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception { String stormTaskPath = Cluster.storm_task_root(topologyId); - if (taskInfoMap != null){ - //reupdate zk node of tasks + if (taskInfoMap != null) { + // reupdate zk node of tasks setObject(stormTaskPath, taskInfoMap); } } + @Override - public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) - throws Exception { + public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception { String stormTaskPath = Cluster.storm_task_root(topologyId); Object data = getObject(stormTaskPath, false); - if (data != null){ - ((Map<Integer, TaskInfo>)data).putAll(taskInfoMap); - //reupdate zk node of tasks + if (data != null) { + ((Map<Integer, TaskInfo>) data).putAll(taskInfoMap); + // reupdate zk node of tasks setObject(stormTaskPath, data); } } @Override - public TaskHeartbeat task_heartbeat(String topologyId, int taskId) - throws Exception { - String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId); - - return (TaskHeartbeat) getObjectSync(taskbeatPath, false); - - } - - @Override - public void task_heartbeat(String topologyId, int taskId, TaskHeartbeat info) - throws Exception { - String taskPath = Cluster.taskbeat_path(topologyId, taskId); - - setObject(taskPath, info); - } - - @Override public List<String> task_storms() throws Exception { return cluster_state.get_children(Cluster.TASKS_SUBTREE, false); } @@ -623,23 +570,22 @@ public class StormZkClusterState implements StormClusterState { if (data == null) { return null; } - return ((Map<Integer, TaskInfo>)data).keySet(); + return ((Map<Integer, TaskInfo>) data).keySet(); } @Override - public Set<Integer> task_ids_by_componentId(String topologyId, - String componentId) throws Exception { + public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception { String stormTaskPath = Cluster.storm_task_root(topologyId); Object data = getObject(stormTaskPath, false); if (data == null) { return null; } - Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>)data; + Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>) data; Set<Integer> rtn = new HashSet<Integer>(); Set<Integer> taskIds = taskInfoMap.keySet(); - for(Integer taskId : taskIds){ + for (Integer taskId : taskIds) { TaskInfo taskInfo = taskInfoMap.get(taskId); - if (taskInfo != null){ + if (taskInfo != null) { if (taskInfo.getComponentId().equalsIgnoreCase(componentId)) rtn.add(taskId); } @@ -672,13 +618,11 @@ public class StormZkClusterState implements StormClusterState { if (callback != null) { supervisors_callback.set(callback); } - return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE, - callback != null); + return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null); } @Override - public void supervisor_heartbeat(String supervisorId, SupervisorInfo info) - throws Exception { + public void supervisor_heartbeat(String supervisorId, SupervisorInfo info) throws Exception { String supervisorPath = Cluster.supervisor_path(supervisorId); @@ -703,15 +647,13 @@ public class StormZkClusterState implements StormClusterState { } public String get_nimbus_slave_time(String host) throws Exception { - String path = - Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host; - return (String) getObject(path, false); + String path = Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host; + return getString(path, false); } @Override public void update_nimbus_slave(String host, int time) throws Exception { - setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR - + host, String.valueOf(time)); + setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host, String.valueOf(time)); } @Override @@ -720,8 +662,24 @@ public class StormZkClusterState implements StormClusterState { } @Override - public boolean try_to_be_leader(String path, String host, - RunnableCallback callback) throws Exception { + public void update_nimbus_detail(String hostPort, Map map) throws Exception { + // TODO Auto-generated method stub + cluster_state.set_ephemeral_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, Utils.serialize(map)); + } + + @Override + public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception { + byte[] data = cluster_state.get_data(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, watch); + return (Map) Utils.maybe_deserialize(data); + } + @Override + public void unregister_nimbus_detail(String hostPort) throws Exception { + cluster_state.delete_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort); + } + + + @Override + public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception { // TODO Auto-generated method stub if (callback != null) this.master_callback.set(callback); @@ -736,24 +694,53 @@ public class StormZkClusterState implements StormClusterState { } @Override - public void set_topology_metric(String topologyId, Object metric) - throws Exception { - // TODO Auto-generated method stub + public void set_topology_metric(String topologyId, Object metric) throws Exception { String path = Cluster.metric_path(topologyId); - setObject(path, metric); } @Override public Object get_topology_metric(String topologyId) throws Exception { - // TODO Auto-generated method stub return getObject(Cluster.metric_path(topologyId), false); } - @Override - public List<String> get_metrics() throws Exception { - // TODO Auto-generated method stub - return cluster_state.get_children(Cluster.METRIC_SUBTREE, false); - } + @Override + public List<String> get_metrics() throws Exception { + return cluster_state.get_children(Cluster.METRIC_SUBTREE, false); + } + @Override + public List<String> list_dirs(String path, boolean watch) throws Exception { + List<String> subDirs = null; + subDirs = cluster_state.get_children(path, watch); + return subDirs; + } + @Override + public List<String> backpressureInfos() throws Exception { + return cluster_state.get_children(Cluster.BACKPRESSURE_SUBTREE, false); + } + + @Override + public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception { + String path = Cluster.backpressure_path(topologyId); + cluster_state.set_data(path, Utils.serialize(sourceToBackpressureInfo)); + } + + @Override + public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception { + String path = Cluster.backpressure_path(topologyId); + byte[] data = cluster_state.get_data(path, false); + return (Map<String, SourceBackpressureInfo>) Utils.maybe_deserialize(data); + } + + @Override + public void teardown_backpressure(String topologyId) { + try { + String backpressurePath = Cluster.backpressure_path(topologyId); + + cluster_state.delete_node(backpressurePath); + } catch (Exception e) { + LOG.warn("Could not teardown backpressure info for " + topologyId, e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java new file mode 100644 index 0000000..120bbfb --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.common.metric.snapshot.AsmCounterSnapshot; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.codahale.metrics.Counter; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * counter wrapper. note that counter is a little special, every snapshot we only return the delta value instead of + * total value, which prevents data loss if certain tasks are killed. + */ +public class AsmCounter extends AsmMetric<Counter> { + + private final Map<Integer, Counter> counterMap = new ConcurrentHashMap<>(); + private Counter unFlushed = new Counter(); + + public AsmCounter() { + super(); + for (int win : windowSeconds) { + counterMap.put(win, new Counter()); + } + } + + public void inc() { + update(1); + } + + @Override + public void update(Number val) { + this.unFlushed.inc(val.longValue()); + } + + /** + * flush temp counter data to all windows & assoc metrics. + */ + protected void doFlush() { + long v = unFlushed.getCount(); + for (Counter counter : counterMap.values()) { + counter.inc(v); + } + for (AsmMetric assocMetric : assocMetrics) { + assocMetric.updateDirectly(v); + } + + this.unFlushed.dec(v); + } + + @Override + public Map<Integer, Counter> getWindowMetricMap() { + return counterMap; + } + + @Override + public Counter mkInstance() { + return new Counter(); + } + + @Override + protected void updateSnapshot(int window) { + Counter counter = counterMap.get(window); + if (counter != null) { + AsmSnapshot snapshot = new AsmCounterSnapshot().setValue(counter.getCount()) + .setTs(System.currentTimeMillis()).setMetricId(metricId); + snapshots.put(window, snapshot); + } + } + + @Override + public AsmMetric clone() { + return new AsmCounter(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java new file mode 100644 index 0000000..4bc255a --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.common.metric.snapshot.AsmGaugeSnapshot; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.codahale.metrics.Gauge; + +import java.util.Map; + +/** + * gauges cannot be aggregated. + */ +public class AsmGauge extends AsmMetric<Gauge> { + + private Gauge gauge; + + public AsmGauge(Gauge<Double> gauge) { + this.aggregate = false; + this.gauge = gauge; + } + + @Override + public void update(Number obj) { + // nothing to do for gauges. + } + + @Override + public AsmMetric clone() { + AsmMetric metric = new AsmGauge(this.gauge); + metric.setMetricName(this.getMetricName()); + return metric; + } + + @Override + public Map<Integer, Gauge> getWindowMetricMap() { + return null; + } + + @Override + public Gauge mkInstance() { + return null; + } + + @Override + protected void doFlush() { + // nothing to do for gauges. + } + + @Override + protected void updateSnapshot(int window) { + double v = (Double) gauge.getValue(); + AsmSnapshot snapshot = new AsmGaugeSnapshot().setValue(v) + .setTs(System.currentTimeMillis()).setMetricId(metricId); + snapshots.put(window, snapshot); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java new file mode 100644 index 0000000..43c8dbc --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Histogram; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * each window has a separate histogram, which is recreated after the window cycle. + */ +public class AsmHistogram extends AsmMetric<Histogram> { + + private final Map<Integer, Histogram> histogramMap = new ConcurrentHashMap<Integer, Histogram>(); + private Histogram unFlushed = newHistogram(); + + public AsmHistogram() { + super(); + for (int win : windowSeconds) { + histogramMap.put(win, newHistogram()); + } + } + + @Override + public void update(Number obj) { + if (sample()) { + this.unFlushed.update(obj.longValue()); + } + } + + @Override + public void updateDirectly(Number obj) { + this.unFlushed.update(obj.longValue()); + } + + @Override + public Map<Integer, Histogram> getWindowMetricMap() { + return histogramMap; + } + + @Override + public Histogram mkInstance() { + return newHistogram(); + } + + @Override + protected void updateSnapshot(int window) { + Histogram histogram = histogramMap.get(window); + if (histogram != null) { + AsmSnapshot snapshot = new AsmHistogramSnapshot().setSnapshot(histogram.getSnapshot()) + .setTs(System.currentTimeMillis()).setMetricId(metricId); + snapshots.put(window, snapshot); + } + } + + /** + * flush temp histogram data to all windows & assoc metrics. + */ + protected void doFlush() { + long[] values = unFlushed.getSnapshot().getValues(); + for (Histogram histogram : histogramMap.values()) { + for (long val : values) { + histogram.update(val); + } + } + for (long val : values) { + for (AsmMetric metric : this.assocMetrics) { + metric.updateDirectly(val); + } + } + this.unFlushed = newHistogram(); + } + + @Override + public AsmMetric clone() { + return new AsmHistogram(); + } + + private Histogram newHistogram() { + return new Histogram(new ExponentiallyDecayingReservoir()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java new file mode 100644 index 0000000..8959800 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.codahale.metrics.Meter; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * one meter & one snapshot for all windows. since meter is window-sliding, there's no need to recreate new ones. + */ +public class AsmMeter extends AsmMetric<Meter> { + private final Meter meter = new Meter(); + + public void mark() { + meter.mark(1l); + } + + @Override + public void update(Number obj) { + meter.mark(obj.longValue()); + for (AsmMetric metric : this.assocMetrics) { + metric.update(obj); + } + } + + + @Override + public AsmMetric clone() { + return new AsmMeter(); + } + + @Override + public Map<Integer, Meter> getWindowMetricMap() { + return null; + } + + @Override + protected void doFlush() { + // nothing to do for meters. + } + + @Override + protected void updateSnapshot(int window) { + AsmMeterSnapshot meterSnapshot = new AsmMeterSnapshot(); + meterSnapshot.setM1(meter.getOneMinuteRate()).setM5(meter.getFiveMinuteRate()).setM15(meter.getFifteenMinuteRate()).setMean(meter.getMeanRate()) + .setTs(System.currentTimeMillis()).setMetricId(metricId); + snapshots.put(window, meterSnapshot); + } + + @Override + public Meter mkInstance() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java new file mode 100644 index 0000000..d399e12 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metric.MetaType; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.utils.TimeUtils; +import com.codahale.metrics.Metric; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class AsmMetric<T extends Metric> { + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + private static final Joiner JOINER = Joiner.on("."); + + protected static final List<Integer> windowSeconds = Lists + .newArrayList(AsmWindow.M1_WINDOW, AsmWindow.M10_WINDOW, AsmWindow.H2_WINDOW, AsmWindow.D1_WINDOW); + protected static final List<Integer> nettyWindows = Lists.newArrayList(AsmWindow.M1_WINDOW); + + protected static int minWindow = AsmWindow.M1_WINDOW; + protected static final List<Integer> EMPTY_WIN = Lists.newArrayListWithCapacity(0); + /** + * sample rate for meter, histogram and timer, note that counter & gauge are not sampled. + */ + private static double sampleRate = ConfigExtension.DEFAULT_METRIC_SAMPLE_RATE; + + protected int op = MetricOp.REPORT; + protected volatile long metricId = 0L; + protected String metricName; + protected boolean aggregate = true; + protected volatile long lastFlushTime = TimeUtils.current_time_secs() - AsmWindow.M1_WINDOW; + protected Map<Integer, Long> rollingTimeMap = new ConcurrentHashMap<>(); + protected Map<Integer, Boolean> rollingDirtyMap = new ConcurrentHashMap<>(); + + protected final Map<Integer, AsmSnapshot> snapshots = new ConcurrentHashMap<Integer, AsmSnapshot>(); + + protected Set<AsmMetric> assocMetrics = new HashSet<AsmMetric>(); + + public AsmMetric() { + for (Integer win : windowSeconds) { + rollingTimeMap.put(win, lastFlushTime); + rollingDirtyMap.put(win, false); + } + } + + /** + * keep a random for each instance to avoid competition (although it's thread-safe). + */ + private final Random rand = new Random(); + + protected boolean sample() { + return rand.nextDouble() <= sampleRate; + } + + public static void setSampleRate(double sampleRate) { + AsmMetric.sampleRate = sampleRate; + } + + /** + * In order to improve performance + */ + public abstract void update(Number obj); + + + public void updateDirectly(Number obj) { + update(obj); + } + + public abstract AsmMetric clone(); + + public AsmMetric setOp(int op) { + this.op = op; + return this; + } + + public int getOp() { + return this.op; + } + + /** + * for test + */ + public static void setWindowSeconds(List<Integer> windows) { + synchronized (windowSeconds) { + windowSeconds.clear(); + windowSeconds.addAll(windows); + + minWindow = getMinWindow(windows); + } + } + + public static int getMinWindow(List<Integer> windows) { + int min = Integer.MAX_VALUE; + for (int win : windows) { + if (win < min) { + min = win; + } + } + return min; + } + + public void addAssocMetrics(AsmMetric... metrics) { + Collections.addAll(assocMetrics, metrics); + } + + public long getMetricId() { + return metricId; + } + + public void setMetricId(long metricId) { + this.metricId = metricId; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public void flush() { + long time = TimeUtils.current_time_secs(); + List<Integer> windows = getValidWindows(); + if (windows.size() == 0) { + return; + } + + doFlush(); + + List<Integer> rollwindows = rollWindows(time, windows); + + for (int win : windows) { + if (rollwindows.contains(win)) { + updateSnapshot(win); + + Map<Integer, T> metricMap = getWindowMetricMap(); + if (metricMap != null) { + metricMap.put(win, mkInstance()); + } + } else if (!rollingDirtyMap.get(win)) { + //if this window has never been passed, we still update this window snapshot + updateSnapshot(win); + } + } + this.lastFlushTime = TimeUtils.current_time_secs(); + } + + public List<Integer> rollWindows(long time, List<Integer> windows) { + List<Integer> rolling = new ArrayList<>(); + for (Integer win : windows) { + long rollingTime = rollingTimeMap.get(win); + // might delay somehow, so add extra 5 sec bias + if (time - rollingTime >= win - 5) { + rolling.add(win); + rollingDirtyMap.put(win, true); //mark this window has been passed + rollingTimeMap.put(win, (long) TimeUtils.current_time_secs()); + } + } + return rolling; + } + + /** + * flush temp data to all windows & assoc metrics. + */ + protected abstract void doFlush(); + + public abstract Map<Integer, T> getWindowMetricMap(); + + public abstract T mkInstance(); + + protected abstract void updateSnapshot(int window); + + public Map<Integer, AsmSnapshot> getSnapshots() { + return snapshots; + } + + /** + * DO NOT judge whether to flush by 60sec because there might be nuance by the alignment of time(maybe less than 1 sec?) + * so we subtract 5 sec from a min flush window. + */ + public List<Integer> getValidWindows() { + long diff = TimeUtils.current_time_secs() - this.lastFlushTime + 5; + if (diff < minWindow) { + // logger.warn("no valid windows for metric:{}, diff:{}", this.metricName, diff); + return EMPTY_WIN; + } + // for netty metrics, use only 1min window + if (this.metricName.startsWith(MetaType.NETTY.getV())) { + return nettyWindows; + } + + return windowSeconds; + } + + public boolean isAggregate() { + return aggregate; + } + + public void setAggregate(boolean aggregate) { + this.aggregate = aggregate; + } + + public static String mkName(Object... parts) { + return JOINER.join(parts); + } + + public static class MetricOp { + public static final int LOG = 1; + public static final int REPORT = 2; + } + + public static class Builder { + public static AsmMetric build(MetricType metricType) { + AsmMetric metric; + if (metricType == MetricType.COUNTER) { + metric = new AsmCounter(); + } else if (metricType == MetricType.METER) { + metric = new AsmMeter(); + } else if (metricType == MetricType.HISTOGRAM) { + metric = new AsmHistogram(); + } else if (metricType == MetricType.TIMER) { + metric = new AsmTimer(); + } else { + throw new IllegalArgumentException("invalid metric type:" + metricType); + } + return metric; + } + } + + public static void main(String[] args) throws Exception { + AsmMeter meter = new AsmMeter(); + int t = 0, f = 0; + for (int i = 0; i < 100; i++) { + if (meter.sample()) { + t++; + } else { + f++; + } + } + System.out.println(t + "," + f); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java new file mode 100644 index 0000000..d9f46e1 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot; +import com.alibaba.jstorm.common.metric.snapshot.AsmTimerSnapshot; +import com.codahale.metrics.*; +import com.codahale.metrics.Timer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * same as histogram, each window has a separate timer, which is recreated after the window cycle. note that all data in a timer are measured by nanoseconds. so + * for most cases, you can replace with histograms. + */ +public class AsmTimer extends AsmMetric<Timer> { + private final Map<Integer, Timer> timerMap = new ConcurrentHashMap<Integer, Timer>(); + private Timer unFlushed = newTimer(); + + public AsmTimer() { + super(); + for (int win : windowSeconds) { + timerMap.put(win, newTimer()); + } + } + + @Override + public void update(Number obj) { + if (sample()) { + this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS); + } + } + + @Override + public void updateDirectly(Number obj) { + this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS); + } + + @Override + public Map<Integer, Timer> getWindowMetricMap() { + return timerMap; + } + + @Override + public Timer mkInstance() { + return newTimer(); + } + + @Override + protected void updateSnapshot(int window) { + Timer timer = timerMap.get(window); + if (timer != null){ + AsmTimerSnapshot timerSnapshot = new AsmTimerSnapshot(); + timerSnapshot.setHistogram(timer.getSnapshot()); + timerSnapshot.setMeter(new AsmMeterSnapshot().setM1(timer.getOneMinuteRate()).setM5(timer.getFiveMinuteRate()) + .setM15(timer.getFifteenMinuteRate()).setMean(timer.getMeanRate())); + if (metricId > 0) { + timerSnapshot.setMetricId(metricId); + } + timerSnapshot.setTs(System.currentTimeMillis()); + snapshots.put(window, timerSnapshot); + } + } + + /** + * flush temp timer data to all windows & assoc metrics. + */ + protected void doFlush() { + long[] values = unFlushed.getSnapshot().getValues(); + for (Timer timer : timerMap.values()) { + for (long val : values) { + timer.update(val, TimeUnit.MILLISECONDS); + + for (AsmMetric metric : this.assocMetrics) { + metric.updateDirectly(val); + } + } + } + this.unFlushed = newTimer(); + } + + @Override + public AsmMetric clone() { + return new AsmTimer(); + } + + private Timer newTimer() { + return new Timer(new ExponentiallyDecayingReservoir()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java deleted file mode 100755 index f9e97dd..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor; -import com.alibaba.jstorm.common.metric.operator.merger.SumMerger; -import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater; -import com.alibaba.jstorm.common.metric.window.Metric; - -/** - * The class is similar to com.codahale.metrics.Counter - * - * Sum all window's value - * - * how to use Counter , please refer to Sampling Interface - * - * @author zhongyan.feng - * - * @param <T> - */ -public class Counter<T extends Number> extends Metric<T, T> { - private static final long serialVersionUID = -1362345159511508074L; - - /** - * - * @param defaultValue - */ - public Counter(T zero) { - updater = new AddUpdater<T>(); - merger = new SumMerger<T>(); - convertor = new DefaultConvertor<T>(); - defaultValue = zero; - - init(); - } - - public static void main(String[] args) { - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java new file mode 100644 index 0000000..03d13be --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java @@ -0,0 +1,34 @@ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +/** + * @author wange + * @since 15/6/23 + */ +public class CounterData extends MetricBaseData implements KVSerializable { + private long v; + + public long getV() { + return v; + } + + public void setV(long v) { + this.v = v; + } + + @Override + public byte[] getValue() { + return Bytes.toBytes(v); + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + parseKey(key); + this.v = Bytes.toLong(value); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java deleted file mode 100755 index 30fa110..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.util.Map; -import java.util.TreeMap; - -import com.alibaba.jstorm.common.metric.window.Metric; -import com.alibaba.jstorm.common.metric.window.StatBuckets; - -public class Gauge<T extends Number> extends Metric<Number, Number> { - private static final long serialVersionUID = 1985614006717750790L; - - protected com.codahale.metrics.Gauge<T> gauge; - - public Gauge(com.codahale.metrics.Gauge<T> gauge) { - this.gauge = gauge; - - init(); - } - - @Override - public void init() { - - } - - @Override - public void update(Number obj) { - // TODO Auto-generated method stub - } - - @Override - public Map<Integer, Number> getSnapshot() { - // TODO Auto-generated method stub - Number value = gauge.getValue(); - - Map<Integer, Number> ret = new TreeMap<Integer, Number>(); - for (Integer timeKey : windowSeconds) { - ret.put(timeKey, value); - } - ret.put(StatBuckets.ALL_TIME_WINDOW, value); - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java new file mode 100644 index 0000000..134a194 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java @@ -0,0 +1,34 @@ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +/** + * @author wange + * @since 15/6/23 + */ +public class GaugeData extends MetricBaseData implements KVSerializable { + private double v; + + public double getV() { + return v; + } + + public void setV(double v) { + this.v = v; + } + + @Override + public byte[] getValue() { + return Bytes.toBytes(v); + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + parseKey(key); + this.v = Bytes.toDouble(value); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java deleted file mode 100755 index 7276fdf..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import com.alibaba.jstorm.common.metric.operator.convert.Convertor; -import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger; -import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater; -import com.alibaba.jstorm.common.metric.window.Metric; - -/** - * Meter is used to compute tps - * - * Attention: 1. - * - * @author zhongyan.feng - * - */ -public class Histogram extends Metric<Double, Histogram.HistorgramPair> { - private static final long serialVersionUID = -1362345159511508074L; - - public Histogram() { - defaultValue = - new HistorgramPair(); - updater = new AvgUpdater(); - merger = new AvgMerger(); - convertor = new HistogramConvertor(); - - init(); - } - - public static class HistogramConvertor implements - Convertor<HistorgramPair, Double> { - private static final long serialVersionUID = -1569170826785657226L; - - @Override - public Double convert(HistorgramPair from) { - // TODO Auto-generated method stub - if (from == null) { - return 0.0d; - } - - if (from.getTimes() == 0) { - return 0.0d; - } else { - return from.getSum()/ from.getTimes(); - } - } - - } - - public static class HistorgramPair { - private double sum; - private long times; - - public HistorgramPair() { - - } - - public HistorgramPair(double sum, long times){ - this.sum = sum; - this.times = times; - } - - public double getSum() { - return sum; - } - - public void setSum(double sum) { - this.sum = sum; - } - - public void addValue(double value) { - sum += value; - } - - public long getTimes() { - return times; - } - - public void setTimes(long times) { - this.times = times; - } - - public void addTimes(long time) { - times += time; - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak deleted file mode 100755 index b830789..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.jstorm.common.metric.operator.convert.Convertor; -import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger2; -import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater2; -import com.alibaba.jstorm.common.metric.window.Metric; -import com.alibaba.jstorm.utils.Pair; -import com.google.common.util.concurrent.AtomicDouble; - -/** - * Meter is used to compute tps - * - * Attention: 1. - * - * @author zhongyan.feng - * - */ -public class Histogram extends Metric<Double, Pair<AtomicDouble, AtomicLong>> { - private static final long serialVersionUID = -1362345159511508074L; - - public Histogram() { - defaultValue = - new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0), - new AtomicLong(0)); - updater = new AvgUpdater2(); - merger = new AvgMerger2(); - convertor = new HistogramConvertor(); - - init(); - } - - public static class HistogramConvertor implements - Convertor<Pair<AtomicDouble, AtomicLong>, Double> { - private static final long serialVersionUID = -1569170826785657226L; - - @Override - public Double convert(Pair<AtomicDouble, AtomicLong> from) { - // TODO Auto-generated method stub - if (from == null) { - return 0.0d; - } - - if (from.getSecond().get() == 0) { - return 0.0d; - } else { - return from.getFirst().get() / from.getSecond().get(); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java new file mode 100644 index 0000000..5f5ae97 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java @@ -0,0 +1,135 @@ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +/** + * @author wange + * @since 15/6/23 + */ +public class HistogramData extends MetricBaseData implements KVSerializable { + private long min; + private long max; + private double mean; + private double p50; + private double p75; + private double p95; + private double p98; + private double p99; + private double p999; + private double stddev; + + public long getMin() { + return min; + } + + public void setMin(long min) { + this.min = min; + } + + public long getMax() { + return max; + } + + public void setMax(long max) { + this.max = max; + } + + public double getMean() { + return mean; + } + + public void setMean(double mean) { + this.mean = mean; + } + + public double getP50() { + return p50; + } + + public void setP50(double p50) { + this.p50 = p50; + } + + public double getP75() { + return p75; + } + + public void setP75(double p75) { + this.p75 = p75; + } + + public double getP95() { + return p95; + } + + public void setP95(double p95) { + this.p95 = p95; + } + + public double getP98() { + return p98; + } + + public void setP98(double p98) { + this.p98 = p98; + } + + public double getP99() { + return p99; + } + + public void setP99(double p99) { + this.p99 = p99; + } + + public double getP999() { + return p999; + } + + public void setP999(double p999) { + this.p999 = p999; + } + + public double getStddev() { + return stddev; + } + + public void setStddev(double stddev) { + this.stddev = stddev; + } + + @Override + public byte[] getValue() { + byte[] ret = new byte[8 * 9]; + Bytes.putLong(ret, 0, min); + Bytes.putLong(ret, 8, max); + Bytes.putDouble(ret, 16, p50); + Bytes.putDouble(ret, 24, p75); + Bytes.putDouble(ret, 32, p95); + Bytes.putDouble(ret, 40, p98); + Bytes.putDouble(ret, 48, p99); + Bytes.putDouble(ret, 56, p999); + Bytes.putDouble(ret, 64, mean); + + return ret; + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + parseKey(key); + + this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE); + this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE); + this.p50 = Bytes.toDouble(value, 16); + this.p75 = Bytes.toDouble(value, 24); + this.p95 = Bytes.toDouble(value, 32); + this.p98 = Bytes.toDouble(value, 40); + this.p99 = Bytes.toDouble(value, 48); + this.p999 = Bytes.toDouble(value, 56); + this.mean = Bytes.toDouble(value, 64); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java deleted file mode 100755 index ac58912..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.jstorm.common.metric.operator.convert.AtomicLongToLong; -import com.alibaba.jstorm.common.metric.operator.merger.LongSumMerger; -import com.alibaba.jstorm.common.metric.operator.updater.LongAddUpdater; -import com.alibaba.jstorm.common.metric.window.Metric; - -public class LongCounter extends Metric<Long, AtomicLong> { - private static final long serialVersionUID = -1362345159511508074L; - - public LongCounter() { - super.defaultValue = new AtomicLong(0); - super.updater = new LongAddUpdater(); - super.merger = new LongSumMerger(); - super.convertor = new AtomicLongToLong(); - - init(); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java deleted file mode 100755 index e56d025..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor; -import com.alibaba.jstorm.common.metric.operator.merger.TpsMerger; -import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater; -import com.alibaba.jstorm.common.metric.window.Metric; -import com.alibaba.jstorm.common.metric.window.RollingWindow; - -/** - * Meter is used to compute tps - * - * Attention: 1. - * - * @author zhongyan.feng - * - */ -public class Meter extends Metric<Double, Double> { - private static final long serialVersionUID = -1362345159511508074L; - - public Meter() { - defaultValue = 0.0d; - updater = new AddUpdater<Double>(); - merger = new TpsMerger(); - convertor = new DefaultConvertor<Double>(); - - init(); - } - - public void update() { - update(Double.valueOf(1)); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java new file mode 100644 index 0000000..2df87aa --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java @@ -0,0 +1,71 @@ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +/** + * @author wange + * @since 15/6/23 + */ +public class MeterData extends MetricBaseData implements KVSerializable { + private double m1; + private double m5; + private double m15; + private double mean; + + public double getM1() { + return m1; + } + + public void setM1(double m1) { + this.m1 = m1; + } + + public double getM5() { + return m5; + } + + public void setM5(double m5) { + this.m5 = m5; + } + + public double getM15() { + return m15; + } + + public void setM15(double m15) { + this.m15 = m15; + } + + public double getMean() { + return mean; + } + + public void setMean(double mean) { + this.mean = mean; + } + + @Override + public byte[] getValue() { + byte[] ret = new byte[8 * 4]; + Bytes.putDouble(ret, 0, m1); + Bytes.putDouble(ret, 8, m5); + Bytes.putDouble(ret, 16, m15); + Bytes.putDouble(ret, 24, mean); + + return ret; + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + parseKey(key); + + this.m1 = Bytes.toDouble(value, 0); + this.m5 = Bytes.toDouble(value, 8); + this.m15 = Bytes.toDouble(value, 16); + this.mean = Bytes.toDouble(value, 24); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java new file mode 100644 index 0000000..907762d --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java @@ -0,0 +1,59 @@ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +import java.util.Date; + +/** + * @author wange + * @since 15/7/22 + */ +public abstract class MetricBaseData implements KVSerializable { + protected long metricId; + protected int win; + protected Date ts; + + public long getMetricId() { + return metricId; + } + + public void setMetricId(long metricId) { + this.metricId = metricId; + } + + public Date getTs() { + return ts; + } + + public void setTs(Date ts) { + this.ts = ts; + } + + public int getWin() { + return win; + } + + public void setWin(int win) { + this.win = win; + } + + @Override + public byte[] getKey() { + return makeKey(metricId, win, ts.getTime()); + } + + public static byte[] makeKey(long metricId, int win, long ts) { + byte[] ret = new byte[8 + 4 + 8]; + Bytes.putLong(ret, 0, metricId); + Bytes.putInt(ret, 8, win); + Bytes.putLong(ret, 12, ts); + return ret; + } + + protected void parseKey(byte[] key) { + this.metricId = Bytes.toLong(key, 0, KVSerializable.LONG_SIZE); + this.win = Bytes.toInt(key, 8, KVSerializable.INT_SIZE); + this.ts = new Date(Bytes.toLong(key, 12, KVSerializable.LONG_SIZE)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java deleted file mode 100755 index 92b1f6b..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.io.Serializable; - -import com.alibaba.jstorm.common.metric.window.Metric; - -public interface MetricFilter extends Serializable { - /** - * Matches all metrics, regardless of type or name. - */ - MetricFilter ALL = new MetricFilter() { - private static final long serialVersionUID = 7089987006352295530L; - - @Override - public boolean matches(String name, Metric metric) { - return true; - } - }; - - /** - * Returns {@code true} if the metric matches the filter; {@code false} - * otherwise. - * - * @param name the metric's name - * @param metric the metric - * @return {@code true} if the metric matches the filter - */ - boolean matches(String name, Metric metric); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java new file mode 100644 index 0000000..da6b4dd --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java @@ -0,0 +1,213 @@ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.metric.KVSerializable; +import com.alibaba.jstorm.metric.MetaType; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.utils.JStormUtils; + +import java.util.Date; + +/** + * @author wange + * @since 15/6/18 + */ +public class MetricMeta implements KVSerializable { + // common + private long id; + // string id + private String sid; + private String clusterName; + private String topologyId; + private int metricType; + private String metricGroup = MetricUtils.DEFAULT_GROUP;//sys group + private String metricName; + private Date gmtCreate = new Date(); + + // task meta + private String component = MetricUtils.EMPTY; + private int taskId = 0; + private String streamId = MetricUtils.EMPTY; + private int metaType; + + // worker meta + private String host = MetricUtils.EMPTY; + private int port = 0; + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + this.sid = id + ""; + } + + public String getSid() { + return sid; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getTopologyId() { + return topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getMetricType() { + return metricType; + } + + public void setMetricType(int metricType) { + this.metricType = metricType; + } + + public String getMetricGroup() { + return metricGroup; + } + + public void setMetricGroup(String metricGroup) { + this.metricGroup = metricGroup; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public Date getGmtCreate() { + return gmtCreate; + } + + public void setGmtCreate(Date gmtCreate) { + this.gmtCreate = gmtCreate; + } + + public String getComponent() { + return component; + } + + public void setComponent(String component) { + this.component = component; + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public int getMetaType() { + return metaType; + } + + public void setMetaType(int metaType) { + this.metaType = metaType; + } + + public boolean isWorkerMetric() { + return this.metaType == MetaType.NETTY.getT() || this.getMetaType() == MetaType.WORKER.getT() || + this.metaType == MetaType.TOPOLOGY.getT(); + } + + public String getFQN() { + MetaType meta = MetaType.parse(metaType); + MetricType metric = MetricType.parse(metricType); + String types = meta.getV() + metric.getV(); + if (isWorkerMetric()) { + return MetricUtils.concat2(types, topologyId, host, port, metricGroup, metricName); + } + return MetricUtils.concat2(types, topologyId, component, taskId, streamId, metricGroup, metricName); + } + + /** + * key: clusterName + topologyId + metaType + id + */ + @Override + public byte[] getKey() { + StringBuilder sb = new StringBuilder(64); + sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT) + .append(metaType).append(MetricUtils.AT).append(id); + return sb.toString().getBytes(); + } + + /** + * value: component + taskId + streamId + metricType + host + port + metricGroup + metricName + */ + @Override + public byte[] getValue() { + StringBuilder sb = new StringBuilder(64); + sb.append(component).append(MetricUtils.AT).append(taskId).append(MetricUtils.AT) + .append(streamId).append(MetricUtils.AT).append(metricType).append(MetricUtils.AT) + .append(host).append(MetricUtils.AT).append(port).append(MetricUtils.AT) + .append(metricGroup).append(MetricUtils.AT).append(metricName); + return sb.toString().getBytes(); + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + String[] keyParts = new String(key).split(MetricUtils.DELIM); + if (keyParts.length >= 4) { + this.clusterName = keyParts[0]; + this.topologyId = keyParts[1]; + this.metaType = Integer.valueOf(keyParts[2]); + this.id = Long.valueOf(keyParts[3]); + this.sid = this.id + ""; + } + String[] valueParts = new String(value).split(MetricUtils.DELIM); + if (valueParts.length >= 8) { + this.component = valueParts[0]; + this.taskId = JStormUtils.parseInt(valueParts[1], 0); + this.streamId = valueParts[2]; + this.metricType = JStormUtils.parseInt(valueParts[3], 0); + this.host = valueParts[4]; + this.port = JStormUtils.parseInt(valueParts[5], 0); + this.metricGroup = valueParts[6]; + this.metricName = valueParts[7]; + } + return this; + } + + public static MetricMeta parse(String name) { + return MetricMetaParser.fromMetricName(name); + } + +}
