http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java index da69070..0cc16e4 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,27 +18,6 @@ */ package com.alibaba.jstorm.daemon.worker; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URL; -import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; @@ -48,28 +28,38 @@ import backtype.storm.scheduler.WorkerSlot; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.Utils; import backtype.storm.utils.WorkerClassLoader; - import com.alibaba.jstorm.callback.AsyncLoopDefaultKill; +import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.Cluster; -import com.alibaba.jstorm.cluster.ClusterState; -import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.common.metric.window.Metric; +import com.alibaba.jstorm.cluster.*; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.daemon.nimbus.StatusType; import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger; import com.alibaba.jstorm.message.netty.ControlMessage; +import com.alibaba.jstorm.metric.*; import com.alibaba.jstorm.schedule.Assignment; -import com.alibaba.jstorm.schedule.Assignment.AssignmentType; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; import com.alibaba.jstorm.task.TaskShutdownDameon; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.zk.ZkTool; +import com.codahale.metrics.Gauge; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URL; +import java.security.InvalidParameterException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.alibaba.jstorm.schedule.Assignment.AssignmentType; public class WorkerData { @@ -112,6 +102,8 @@ public class WorkerData { private volatile Set<Integer> outboundTasks; private Set<Integer> localTasks; + private Set<Integer> localNodeTasks; + private ConcurrentHashMap<Integer, DisruptorQueue> innerTaskTransfer; private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues; @@ -152,15 +144,18 @@ public class WorkerData { private ScheduledExecutorService threadPool; private volatile Long assignmentTS; // Assignment timeStamp. The time of - // last update of assignment + // last update of assignment + private volatile AssignmentType assignmentType; - + private IConnection recvConnection; - @SuppressWarnings({ "rawtypes", "unchecked" }) - public WorkerData(Map conf, IContext context, String topology_id, - String supervisor_id, int port, String worker_id, String jar_path) - throws Exception { + private JStormMetricsReporter metricReporter; + + private AsyncLoopThread healthReporterThread; + + @SuppressWarnings({"rawtypes", "unchecked"}) + public WorkerData(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception { this.conf = conf; this.context = context; @@ -170,7 +165,7 @@ public class WorkerData { this.workerId = worker_id; this.shutdown = new AtomicBoolean(false); - + this.monitorEnable = new AtomicBoolean(true); this.topologyStatus = StatusType.active; @@ -183,29 +178,54 @@ public class WorkerData { this.zkClusterstate = ZkTool.mk_distributed_cluster_state(conf); this.zkCluster = Cluster.mk_storm_cluster_state(zkClusterstate); - Map rawConf = - StormConfig.read_supervisor_topology_conf(conf, topology_id); + Map rawConf = StormConfig.read_supervisor_topology_conf(conf, topology_id); this.stormConf = new HashMap<Object, Object>(); this.stormConf.putAll(conf); this.stormConf.putAll(rawConf); - + + JStormMetrics.setTopologyId(topology_id); + JStormMetrics.setPort(port); + JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(stormConf)); + JStormMetrics.setEnabled(ConfigExtension.isEnableMetrics(stormConf)); + JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(stormConf)); + AsmMetric.setSampleRate(ConfigExtension.getMetricSampleRate(stormConf)); + ConfigExtension.setLocalSupervisorId(stormConf, supervisorId); ConfigExtension.setLocalWorkerId(stormConf, workerId); ConfigExtension.setLocalWorkerPort(stormConf, port); ControlMessage.setPort(port); - Metric.setEnable(ConfigExtension.isEnablePerformanceMetrics(stormConf)); + + JStormMetrics.registerWorkerTopologyMetric( + JStormMetrics.workerMetricName(MetricDef.CPU_USED_RATIO, MetricType.GAUGE), + new AsmGauge(new Gauge<Double>() { + @Override + public Double getValue() { + return JStormUtils.getCpuUsage(); + } + })); + + JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.MEMORY_USED, MetricType.GAUGE), + new AsmGauge(new Gauge<Double>() { + @Override + public Double getValue() { + return JStormUtils.getMemUsage(); + } + })); + + JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.DISK_USAGE, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { + @Override + public Double getValue() { + return JStormUtils.getDiskUsage(); + } + })); LOG.info("Worker Configuration " + stormConf); try { + boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(stormConf); + boolean enableDebugClassloader = ConfigExtension.isEnableClassloaderDebug(stormConf); - boolean enableClassloader = - ConfigExtension.isEnableTopologyClassLoader(stormConf); - boolean enableDebugClassloader = - ConfigExtension.isEnableClassloaderDebug(stormConf); - - if (jar_path == null && enableClassloader == true - && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) { + if (jar_path == null && enableClassloader == true && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) { LOG.error("enable classloader, but not app jar"); throw new InvalidParameterException(); } @@ -221,14 +241,11 @@ public class WorkerData { urls.add(url); } urlArray = urls.toArray(new URL[0]); - } - WorkerClassLoader.mkInstance(urlArray, ClassLoader - .getSystemClassLoader(), ClassLoader.getSystemClassLoader() - .getParent(), enableClassloader, enableDebugClassloader); + WorkerClassLoader.mkInstance(urlArray, ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), enableClassloader, + enableDebugClassloader); } catch (Exception e) { - // TODO Auto-generated catch block LOG.error("init jarClassLoader error!", e); throw new InvalidParameterException(); } @@ -237,39 +254,27 @@ public class WorkerData { this.context = TransportFactory.makeContext(stormConf); } - boolean disruptorUseSleep = - ConfigExtension.isDisruptorUseSleep(stormConf); + boolean disruptorUseSleep = ConfigExtension.isDisruptorUseSleep(stormConf); DisruptorQueue.setUseSleep(disruptorUseSleep); - boolean isLimited = - ConfigExtension.getTopologyBufferSizeLimited(stormConf); + boolean isLimited = ConfigExtension.getTopologyBufferSizeLimited(stormConf); DisruptorQueue.setLimited(isLimited); - LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:" - + isLimited); + LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:" + isLimited); // this.transferQueue = new LinkedBlockingQueue<TransferData>(); - int buffer_size = - Utils.getInt(conf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)); - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(conf); - this.transferQueue = - DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, - buffer_size, waitStrategy); + int buffer_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)); + WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf); + this.transferQueue = DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, buffer_size, waitStrategy); this.transferQueue.consumerStarted(); - this.sendingQueue = - DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, - buffer_size, waitStrategy); + this.sendingQueue = DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, buffer_size, waitStrategy); this.sendingQueue.consumerStarted(); this.nodeportSocket = new ConcurrentHashMap<WorkerSlot, IConnection>(); this.taskNodeport = new ConcurrentHashMap<Integer, WorkerSlot>(); this.workerToResource = new ConcurrentSkipListSet<ResourceWorkerSlot>(); - this.innerTaskTransfer = - new ConcurrentHashMap<Integer, DisruptorQueue>(); - this.deserializeQueues = - new ConcurrentHashMap<Integer, DisruptorQueue>(); + this.innerTaskTransfer = new ConcurrentHashMap<Integer, DisruptorQueue>(); + this.deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>(); this.tasksToComponent = new ConcurrentHashMap<Integer, String>(); - this.componentToSortedTasks = - new ConcurrentHashMap<String, List<Integer>>(); + this.componentToSortedTasks = new ConcurrentHashMap<String, List<Integer>>(); Assignment assignment = zkCluster.assignment_info(topologyId, null); if (assignment == null) { @@ -288,8 +293,7 @@ public class WorkerData { LOG.info("Current worker taskList:" + taskids); // deserialize topology code from local dir - rawTopology = - StormConfig.read_supervisor_topology_code(conf, topology_id); + rawTopology = StormConfig.read_supervisor_topology_code(conf, topology_id); sysTopology = Common.system_topology(stormConf, rawTopology); generateMaps(); @@ -301,15 +305,17 @@ public class WorkerData { threadPool = Executors.newScheduledThreadPool(THREAD_POOL_NUM); TimerTrigger.setScheduledExecutorService(threadPool); + if (!StormConfig.local_mode(stormConf)) { + healthReporterThread = new AsyncLoopThread(new JStormHealthReporter(this)); + } + try { - Long tmp = - StormConfig.read_supervisor_topology_timestamp(conf, - topology_id); + Long tmp = StormConfig.read_supervisor_topology_timestamp(conf, topology_id); assignmentTS = (tmp == null ? System.currentTimeMillis() : tmp); } catch (FileNotFoundException e) { assignmentTS = System.currentTimeMillis(); } - + outboundTasks = new HashSet<Integer>(); LOG.info("Successfully create WorkerData"); @@ -317,13 +323,10 @@ public class WorkerData { } /** - * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private - * HashMap<Integer, String> tasksToComponent; private Map<String, - * List<Integer>> componentToSortedTasks; private Map<String, Map<String, - * Fields>> componentToStreamToFields; private Map<String, Object> - * defaultResources; private Map<String, Object> userResources; private - * Map<String, Object> executorData; private Map registeredMetrics; - * + * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private HashMap<Integer, String> tasksToComponent; private Map<String, List<Integer>> + * componentToSortedTasks; private Map<String, Map<String, Fields>> componentToStreamToFields; private Map<String, Object> defaultResources; private + * Map<String, Object> userResources; private Map<String, Object> executorData; private Map registeredMetrics; + * * @throws Exception */ private void generateMaps() throws Exception { @@ -335,7 +338,7 @@ public class WorkerData { this.registeredMetrics = new HashMap(); } - public Map<Object, Object> getConf() { + public Map<Object, Object> getRawConf() { return conf; } @@ -351,6 +354,10 @@ public class WorkerData { this.topologyStatus = topologyStatus; } + public Map<Object, Object> getConf() { + return stormConf; + } + public Map<Object, Object> getStormConf() { return stormConf; } @@ -396,7 +403,19 @@ public class WorkerData { } public ConcurrentSkipListSet<ResourceWorkerSlot> getWorkerToResource() { - return workerToResource; + synchronized (workerToResource) { + return workerToResource; + } + } + + public void updateWorkerToResource(Set<ResourceWorkerSlot> workers) { + synchronized (workerToResource) { + Set<ResourceWorkerSlot> oldWorkers = workerToResource.clone(); + oldWorkers.removeAll(workers); + if (oldWorkers.size() > 0) + workerToResource.removeAll(workers); + workerToResource.addAll(workers); + } } public ConcurrentHashMap<Integer, DisruptorQueue> getInnerTaskTransfer() { @@ -471,8 +490,7 @@ public class WorkerData { this.shutdownTasks.add(shutdownTask); } - public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds( - Set<Integer> taskIds) { + public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(Set<Integer> taskIds) { List<TaskShutdownDameon> ret = new ArrayList<TaskShutdownDameon>(); for (TaskShutdownDameon shutdown : shutdownTasks) { if (taskIds.contains(shutdown.getTaskId())) @@ -494,7 +512,7 @@ public class WorkerData { outTaskStatus.put(taskId, false); } } - + public Map<Integer, Boolean> getOutboundTaskStatus() { return outTaskStatus; } @@ -502,7 +520,7 @@ public class WorkerData { public void addOutboundTaskStatusIfAbsent(Integer taskId) { outTaskStatus.putIfAbsent(taskId, false); } - + public void removeOutboundTaskStatus(Integer taskId) { outTaskStatus.remove(taskId); } @@ -512,8 +530,7 @@ public class WorkerData { } public boolean isOutboundTaskActive(Integer taskId) { - return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId) - : false; + return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId) : false; } public ScheduledExecutorService getThreadPool() { @@ -527,11 +544,11 @@ public class WorkerData { public Long getAssignmentTs() { return assignmentTS; } - + public void setAssignmentType(AssignmentType type) { this.assignmentType = type; } - + public AssignmentType getAssignmentType() { return assignmentType; } @@ -544,28 +561,33 @@ public class WorkerData { public void updateTaskIds(Assignment assignment) { this.taskids.clear(); - this.taskids.addAll(assignment - .getCurrentWorkerTasks(supervisorId, port)); + this.taskids.addAll(assignment.getCurrentWorkerTasks(supervisorId, port)); + } + + public Set<Integer> getLocalNodeTasks() { + return localNodeTasks; + } + + public void setLocalNodeTasks(Set<Integer> localNodeTasks) { + this.localNodeTasks = localNodeTasks; } public void setOutboundTasks(Set<Integer> outboundTasks) { this.outboundTasks = outboundTasks; } - + public Set<Integer> getOutboundTasks() { return outboundTasks; } private void updateTaskComponentMap() throws Exception { - Map<Integer, String> tmp = Common.getTaskToComponent( - Cluster.get_all_taskInfo(zkCluster, topologyId)); + Map<Integer, String> tmp = Common.getTaskToComponent(Cluster.get_all_taskInfo(zkCluster, topologyId)); this.tasksToComponent.putAll(tmp); LOG.info("Updated tasksToComponentMap:" + tasksToComponent); this.componentToSortedTasks.putAll(JStormUtils.reverse_map(tmp)); - for (java.util.Map.Entry<String, List<Integer>> entry : componentToSortedTasks - .entrySet()) { + for (Map.Entry<String, List<Integer>> entry : componentToSortedTasks.entrySet()) { List<Integer> tasks = entry.getValue(); Collections.sort(tasks); @@ -573,16 +595,13 @@ public class WorkerData { } private void updateStormTopology() { - StormTopology rawTmp = null; - StormTopology sysTmp = null; - + StormTopology rawTmp; + StormTopology sysTmp; try { - rawTmp = - StormConfig.read_supervisor_topology_code(conf, topologyId); + rawTmp = StormConfig.read_supervisor_topology_code(conf, topologyId); sysTmp = Common.system_topology(stormConf, rawTopology); } catch (IOException e) { - LOG.error("Failed to read supervisor topology code for " - + topologyId, e); + LOG.error("Failed to read supervisor topology code for " + topologyId, e); return; } catch (InvalidTopologyException e) { LOG.error("Failed to update sysTopology for " + topologyId, e); @@ -593,8 +612,7 @@ public class WorkerData { updateTopology(sysTopology, sysTmp); } - private void updateTopology(StormTopology oldTopology, - StormTopology newTopology) { + private void updateTopology(StormTopology oldTopology, StormTopology newTopology) { oldTopology.set_bolts(newTopology.get_bolts()); oldTopology.set_spouts(newTopology.get_spouts()); oldTopology.set_state_spouts(newTopology.get_state_spouts()); @@ -604,12 +622,19 @@ public class WorkerData { return monitorEnable; } - public IConnection getRecvConnection() { - return recvConnection; - } + public IConnection getRecvConnection() { + return recvConnection; + } - public void setRecvConnection(IConnection recvConnection) { - this.recvConnection = recvConnection; - } + public void setRecvConnection(IConnection recvConnection) { + this.recvConnection = recvConnection; + } + public JStormMetricsReporter getMetricsReporter() { + return metricReporter; + } + + public void setMetricsReporter(JStormMetricsReporter metricReporter) { + this.metricReporter = metricReporter; + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java index d8ec622..9d1cca7 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java @@ -25,8 +25,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; /** - * Worker's Heartbeat data woker will update the object to - * /LOCAL-DIR/workers/${woker-id}/heartbeats + * Worker's Heartbeat data woker will update the object to /LOCAL-DIR/workers/${woker-id}/heartbeats * * @author yannian/Longda * @@ -39,8 +38,7 @@ public class WorkerHeartbeat implements Serializable { private Set<Integer> taskIds; private Integer port; - public WorkerHeartbeat(int timeSecs, String topologyId, - Set<Integer> taskIds, Integer port) { + public WorkerHeartbeat(int timeSecs, String topologyId, Set<Integer> taskIds, Integer port) { this.timeSecs = timeSecs; this.topologyId = topologyId; @@ -83,7 +81,6 @@ public class WorkerHeartbeat implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java new file mode 100644 index 0000000..79075bc --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java @@ -0,0 +1,38 @@ +package com.alibaba.jstorm.daemon.worker; + +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.utils.TimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Set; + +/** + * Created by xiaojian.fxj on 2015/8/12. + */ +public class WorkerReportError { + private static Logger LOG = LoggerFactory.getLogger(WorkerReportError.class); + private StormClusterState zkCluster; + private String hostName; + + public WorkerReportError(StormClusterState _storm_cluster_state, + String _hostName) { + this.zkCluster = _storm_cluster_state; + this.hostName = _hostName; + } + public void report(String topology_id, Integer worker_port, + Set<Integer> tasks, String error) { + // Report worker's error to zk + try { + Date now = new Date(); + String nowStr = TimeFormat.getSecond(now); + String errorInfo = error + "on " + this.hostName + ":" + worker_port + "," + nowStr; + for (Integer task : tasks){ + zkCluster.report_task_error(topology_id, task, errorInfo, null); + } + } catch (Exception e) { + LOG.error("Failed update "+worker_port+ "errors to ZK" + "\n", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java index 0691fee..403c8cf 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java @@ -86,14 +86,13 @@ public class WorkerShutdown implements ShutdownableDameon { LOG.info("Worker has been shutdown already"); return; } - - if(recvConnection != null) { - recvConnection.close(); + + if (recvConnection != null) { + recvConnection.close(); } AsyncLoopRunnable.getShutdown().set(true); threadPool.shutdown(); - // shutdown tasks for (ShutdownableDameon task : shutdowntasks) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java index 783f584..ab4213f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java @@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; public class SyncContainerHb extends RunnableCallback { - private final static Logger LOG = LoggerFactory - .getLogger(SyncContainerHb.class); + private final static Logger LOG = LoggerFactory.getLogger(SyncContainerHb.class); private String readDir; private String writeDir; @@ -113,8 +112,7 @@ public class SyncContainerHb extends RunnableCallback { try { hb = Long.valueOf(biggest); } catch (Exception e) { - LOG.info("Heartbeat file " + biggest - + " isn't a valid file, remove it"); + LOG.info("Heartbeat file " + biggest + " isn't a valid file, remove it"); String path = readDir + File.separator + biggest; try { @@ -151,8 +149,7 @@ public class SyncContainerHb extends RunnableCallback { return; } - String seconds = - String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND); + String seconds = String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND); String path = writeDir + File.separator + seconds; @@ -289,8 +286,7 @@ public class SyncContainerHb extends RunnableCallback { this.reserverNum = reserverNum; } - public static AsyncLoopThread mkInstance(String containerHbDir, - String hbDir, int timeout, int frequence) { + public static AsyncLoopThread mkInstance(String containerHbDir, String hbDir, int timeout, int frequence) { SyncContainerHb syncContainerHbThread = new SyncContainerHb(); syncContainerHbThread.setReadDir(containerHbDir); @@ -306,9 +302,7 @@ public class SyncContainerHb extends RunnableCallback { sb.append(",frequence:").append(frequence); LOG.info(sb.toString()); - AsyncLoopThread thread = - new AsyncLoopThread(syncContainerHbThread, true, - Thread.NORM_PRIORITY, true); + AsyncLoopThread thread = new AsyncLoopThread(syncContainerHbThread, true, Thread.NORM_PRIORITY, true); return thread; } @@ -329,31 +323,23 @@ public class SyncContainerHb extends RunnableCallback { } - public static AsyncLoopThread mkSupervisorInstance(Map conf) - throws IOException { - boolean isEnableContainer = - ConfigExtension.isEnableContainerSupervisor(); + public static AsyncLoopThread mkSupervisorInstance(Map conf) throws IOException { + boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor(); if (isEnableContainer) { - String containerHbDir = - ConfigExtension.getContainerSupervisorHearbeat(); + String containerHbDir = ConfigExtension.getContainerSupervisorHearbeat(); String hbDir = StormConfig.supervisorHearbeatForContainer(conf); - int timeout = - ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf); - int frequence = - ConfigExtension.getContainerHeartbeatFrequence(conf); + int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf); + int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf); return mkInstance(containerHbDir, hbDir, timeout, frequence); } - boolean isWorkerAutomaticStop = - ConfigExtension.isWorkerStopWithoutSupervisor(conf); + boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf); if (isWorkerAutomaticStop) { String containerHbDir = null; String hbDir = StormConfig.supervisorHearbeatForContainer(conf); - int timeout = - ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf); - int frequence = - ConfigExtension.getContainerHeartbeatFrequence(conf); + int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf); + int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf); return mkInstance(containerHbDir, hbDir, timeout, frequence); } @@ -364,17 +350,14 @@ public class SyncContainerHb extends RunnableCallback { } public static AsyncLoopThread mkWorkerInstance(Map conf) throws IOException { - boolean isEnableContainer = - ConfigExtension.isEnableContainerSupervisor(); - boolean isWorkerAutomaticStop = - ConfigExtension.isWorkerStopWithoutSupervisor(conf); + boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor(); + boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf); if (isEnableContainer == false && isWorkerAutomaticStop == false) { LOG.info("Run worker without Apsara/Yarn container"); return null; } - String containerHbDir = - StormConfig.supervisorHearbeatForContainer(conf); + String containerHbDir = StormConfig.supervisorHearbeatForContainer(conf); String hbDir = null; int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf); int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java index 74aca05..51e74f8 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java @@ -44,8 +44,7 @@ import com.alibaba.jstorm.utils.TimeUtils; * */ public class WorkerHeartbeatRunable extends RunnableCallback { - private static Logger LOG = LoggerFactory - .getLogger(WorkerHeartbeatRunable.class); + private static Logger LOG = LoggerFactory.getLogger(WorkerHeartbeatRunable.class); private WorkerData workerData; @@ -69,8 +68,7 @@ public class WorkerHeartbeatRunable extends RunnableCallback { this.worker_id = workerData.getWorkerId(); this.port = workerData.getPort(); this.topologyId = workerData.getTopologyId(); - this.task_ids = - new CopyOnWriteArraySet<Integer>(workerData.getTaskids()); + this.task_ids = new CopyOnWriteArraySet<Integer>(workerData.getTaskids()); this.shutdown = workerData.getShutdown(); String key = Config.WORKER_HEARTBEAT_FREQUENCY_SECS; @@ -97,11 +95,9 @@ public class WorkerHeartbeatRunable extends RunnableCallback { public void doHeartbeat() throws IOException { int currtime = TimeUtils.current_time_secs(); - WorkerHeartbeat hb = - new WorkerHeartbeat(currtime, topologyId, task_ids, port); + WorkerHeartbeat hb = new WorkerHeartbeat(currtime, topologyId, task_ids, port); - LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb" - + hb.toString()); + LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb" + hb.toString()); LocalState state = getWorkerState(); state.put(Common.LS_WORKER_HEARTBEAT, hb); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java new file mode 100644 index 0000000..19ba2c9 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.daemon.worker.timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.jstorm.task.backpressure.BackpressureTrigger; + + +public class BackpressureCheckTrigger extends TimerTrigger{ + private static final Logger LOG = LoggerFactory.getLogger(BackpressureCheckTrigger.class); + + private BackpressureTrigger trigger; + + public BackpressureCheckTrigger(int initDelay, int frequence, String name, BackpressureTrigger trigger) { + if (frequence <= 0) { + LOG.warn(" The frequence of " + name + " is invalid"); + frequence = 1; + } + this.firstTime = initDelay; + this.frequence = frequence; + this.trigger = trigger; + } + + @Override + public void run() { + try { + trigger.checkAndTrigger(); + } catch (Exception e) { + LOG.warn("Failed to publish timer event to " + name, e); + return; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java index 5a59e6f..a1a0990 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java @@ -29,26 +29,20 @@ import com.alibaba.jstorm.task.acker.Acker; import com.alibaba.jstorm.utils.JStormUtils; public class RotatingMapTrigger extends TimerTrigger { - private static final Logger LOG = LoggerFactory - .getLogger(RotatingMapTrigger.class); + private static final Logger LOG = LoggerFactory.getLogger(RotatingMapTrigger.class); public RotatingMapTrigger(Map conf, String name, DisruptorQueue queue) { this.name = name; this.queue = queue; this.opCode = TimerConstants.ROTATING_MAP; - int msgTimeOut = - JStormUtils.parseInt( - conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30); + int msgTimeOut = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30); frequence = (msgTimeOut) / (Acker.TIMEOUT_BUCKET_NUM - 1); if (frequence <= 0) { frequence = 1; } - firstTime = - JStormUtils.parseInt( - conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS), - 120); + firstTime = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS), 120); firstTime += frequence; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java new file mode 100644 index 0000000..0e00f66 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java @@ -0,0 +1,32 @@ +package com.alibaba.jstorm.daemon.worker.timer; + +import org.apache.log4j.Logger; + +import com.alibaba.jstorm.task.TaskBatchTransfer; + +public class TaskBatchCheckTrigger extends TimerTrigger { + private static final Logger LOG = Logger.getLogger(TickTupleTrigger.class); + + private TaskBatchTransfer batchTransfer; + + public TaskBatchCheckTrigger(int frequence, String name, TaskBatchTransfer transfer) { + if (frequence <= 0) { + LOG.warn(" The frequence of " + name + " is invalid"); + frequence = 1; + } + this.firstTime = frequence; + this.frequence = frequence; + this.batchTransfer = transfer; + } + + @Override + public void run() { + try { + batchTransfer.startCheck(); + } catch (Exception e) { + LOG.warn("Failed to public timer event to " + name, e); + return; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java index 3a5353f..96165d0 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java @@ -22,9 +22,9 @@ import org.slf4j.LoggerFactory; import com.alibaba.jstorm.task.TaskBatchTransfer; -public class TaskBatchFlushTrigger extends TimerTrigger{ +public class TaskBatchFlushTrigger extends TimerTrigger { private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class); - + private TaskBatchTransfer batchTransfer; public TaskBatchFlushTrigger(int frequence, String name, TaskBatchTransfer transfer) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java index 0b67776..ad81a2b 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java @@ -17,6 +17,7 @@ */ package com.alibaba.jstorm.daemon.worker.timer; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -24,33 +25,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleExt; +import backtype.storm.tuple.TupleImplExt; +import backtype.storm.tuple.Values; import backtype.storm.utils.DisruptorQueue; -import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger.TimerEvent; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.task.error.ITaskReportErr; +import com.alibaba.jstorm.task.UptimeComputer; +import com.alibaba.jstorm.utils.IntervalCheck; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; public class TaskHeartbeatTrigger extends TimerTrigger { - private static final Logger LOG = LoggerFactory - .getLogger(TaskHeartbeatTrigger.class); + private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatTrigger.class); private int taskId; - - private BlockingQueue<Object> controlQueue; + private String componentId; + private TopologyContext sysTopologyCtx; - public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue, - BlockingQueue<Object> controlQueue, int taskId) { + private BlockingQueue<Object> controlQueue = null; + + private OutputCollector boltOutputCollector = null; + private SpoutOutputCollector spoutOutputCollector = null; + + private long executeThreadHbTime; + private int taskHbTimeout; + + private ITaskReportErr reportError; + + private IntervalCheck intervalCheck; + + private UptimeComputer uptime; + + public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue, BlockingQueue<Object> controlQueue, int taskId, String componentId, + TopologyContext sysTopologyCtx, ITaskReportErr reportError) { this.name = name; this.queue = queue; this.controlQueue = controlQueue; this.opCode = TimerConstants.TASK_HEARTBEAT; this.taskId = taskId; + this.componentId = componentId; + this.sysTopologyCtx = sysTopologyCtx; + + this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10); + this.firstTime = frequence; - frequence = - JStormUtils.parseInt( - conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10); + this.executeThreadHbTime = TimeUtils.current_time_secs(); + this.taskHbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_TASK_TIMEOUT_SECS), 180); + this.intervalCheck = new IntervalCheck(); + this.intervalCheck.setInterval(taskHbTimeout); + this.intervalCheck.start(); - firstTime = frequence; + this.reportError = reportError; + + this.uptime = new UptimeComputer(); } @Override @@ -60,7 +94,6 @@ public class TaskHeartbeatTrigger extends TimerTrigger { @Override public void run() { - try { updateObject(); @@ -69,16 +102,63 @@ public class TaskHeartbeatTrigger extends TimerTrigger { return; } + if (intervalCheck.check()) { + checkExecuteThreadHb(); + } + + if (componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { + Values values = new Values(uptime.uptime()); + TupleExt tuple = new TupleImplExt(sysTopologyCtx, values, taskId, Common.TOPOLOGY_MASTER_HB_STREAM_ID); + queue.publish(tuple); + } else { + // Send task heartbeat to topology master + sendHbMsg(); + } + + // Send message used to monitor execute thread TimerEvent event = new TimerEvent(opCode, object); - - controlQueue.offer(event); - LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId); + boolean ret = controlQueue.offer(event); + if (ret) + LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId); + else + LOG.debug("Failed to offer task HB event to controlQueue, taskId=" + taskId); } catch (Exception e) { - LOG.warn("Failed to public timer event to " + name, e); + LOG.warn("Failed to publish timer event to " + name, e); return; } LOG.debug(" Trigger timer event to " + name); } + + public void setSpoutOutputCollector(SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + public void setBoltOutputCollector(OutputCollector boltOutputCollector) { + this.boltOutputCollector = boltOutputCollector; + } + + public void setExeThreadHbTime(long hbTime) { + this.executeThreadHbTime = hbTime; + } + + private void sendHbMsg() { + List values = JStormUtils.mk_list(uptime.uptime()); + if (spoutOutputCollector != null) { + spoutOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values); + } else if (boltOutputCollector != null) { + boltOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values); + } else { + LOG.warn("Failed to send hearbeat msg. OutputCollector has not been initialized!"); + } + } + + private void checkExecuteThreadHb() { + long currentTime = TimeUtils.current_time_secs(); + if (currentTime - executeThreadHbTime > taskHbTimeout) { + String error = "No response from Task-" + taskId + ", last report time(sec) is " + executeThreadHbTime; + reportError.report(error); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java index ecf01c5..a70d8ab 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java @@ -29,13 +29,11 @@ import backtype.storm.utils.DisruptorQueue; import com.alibaba.jstorm.utils.TimeUtils; public class TickTupleTrigger extends TimerTrigger { - private static final Logger LOG = LoggerFactory - .getLogger(TickTupleTrigger.class); + private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class); TopologyContext topologyContext; - public TickTupleTrigger(TopologyContext topologyContext, int frequence, - String name, DisruptorQueue queue) { + public TickTupleTrigger(TopologyContext topologyContext, int frequence, String name, DisruptorQueue queue) { this.name = name; this.queue = queue; this.opCode = TimerConstants.TICK_TUPLE; @@ -53,10 +51,7 @@ public class TickTupleTrigger extends TimerTrigger { @Override public void updateObject() { this.object = - new TupleImplExt(topologyContext, new Values( - TimeUtils.current_time_secs()), - (int) Constants.SYSTEM_TASK_ID, - Constants.SYSTEM_TICK_STREAM_ID); + new TupleImplExt(topologyContext, new Values(TimeUtils.current_time_secs()), (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java index 4cecbea..2c2c39c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java @@ -30,13 +30,11 @@ import backtype.storm.utils.DisruptorQueue; import com.lmax.disruptor.InsufficientCapacityException; public class TimerTrigger implements Runnable { - private static final Logger LOG = LoggerFactory - .getLogger(TimerTrigger.class); + private static final Logger LOG = LoggerFactory.getLogger(TimerTrigger.class); private static ScheduledExecutorService threadPool; - public static void setScheduledExecutorService( - ScheduledExecutorService scheduledExecutorService) { + public static void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { threadPool = scheduledExecutorService; } @@ -44,7 +42,7 @@ public class TimerTrigger implements Runnable { protected int opCode; protected int firstTime; protected int frequence; - protected DisruptorQueue queue; + protected DisruptorQueue queue = null; protected Object object; protected boolean block = true; @@ -53,8 +51,7 @@ public class TimerTrigger implements Runnable { } public void register(TimeUnit timeUnit) { - threadPool.scheduleAtFixedRate(this, firstTime, frequence, - timeUnit); + threadPool.scheduleAtFixedRate(this, firstTime, frequence, timeUnit); LOG.info("Successfully register timer " + this); } @@ -145,8 +142,7 @@ public class TimerTrigger implements Runnable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } public class TimerEvent { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java index 5f16b0b..4d7d32c 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java @@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.TimeUtils; public class ClearThread extends RunnableCallback { - private static final Logger LOG = LoggerFactory - .getLogger(ClearThread.class); + private static final Logger LOG = LoggerFactory.getLogger(ClearThread.class); private final int REQUEST_TIMEOUT_SECS; private static final int TIMEOUT_CHECK_SECS = 5; @@ -42,10 +41,7 @@ public class ClearThread extends RunnableCallback { public ClearThread(Drpc drpc) { drpcService = drpc; - REQUEST_TIMEOUT_SECS = - JStormUtils.parseInt( - drpcService.getConf().get( - Config.DRPC_REQUEST_TIMEOUT_SECS), 60); + REQUEST_TIMEOUT_SECS = JStormUtils.parseInt(drpcService.getConf().get(Config.DRPC_REQUEST_TIMEOUT_SECS), 60); LOG.info("Drpc timeout seconds is " + REQUEST_TIMEOUT_SECS); } @@ -56,8 +52,7 @@ public class ClearThread extends RunnableCallback { if (TimeUtils.time_delta(e.getValue()) > REQUEST_TIMEOUT_SECS) { String id = e.getKey(); - drpcService.getIdtoResult().put(id, - new DRPCExecutionException("Request timed out")); + drpcService.getIdtoResult().put(id, new DRPCExecutionException("Request timed out")); Semaphore s = drpcService.getIdtoSem().get(id); if (s != null) { s.release(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java index 09b4885..adbec06 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.alibaba.jstorm.utils.JStormServerUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.THsHaServer; @@ -53,8 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils; * @author yannian * */ -public class Drpc implements DistributedRPC.Iface, - DistributedRPCInvocations.Iface, Shutdownable { +public class Drpc implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable { private static final Logger LOG = LoggerFactory.getLogger(Drpc.class); @@ -76,23 +76,18 @@ public class Drpc implements DistributedRPC.Iface, private AtomicBoolean shutdown = new AtomicBoolean(false); - private THsHaServer initHandlerServer(Map conf, final Drpc service) - throws Exception { + private THsHaServer initHandlerServer(Map conf, final Drpc service) throws Exception { int port = JStormUtils.parseInt(conf.get(Config.DRPC_PORT)); - int workerThreadNum = - JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS)); + int workerThreadNum = JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS)); int queueSize = JStormUtils.parseInt(conf.get(Config.DRPC_QUEUE_SIZE)); TNonblockingServerSocket socket = new TNonblockingServerSocket(port); THsHaServer.Args targs = new THsHaServer.Args(socket); targs.workerThreads(64); targs.protocolFactory(new TBinaryProtocol.Factory()); - targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>( - service)); + targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>(service)); - ThreadPoolExecutor executor = - new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60, - TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)); + ThreadPoolExecutor executor = new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)); targs.executorService(executor); THsHaServer handlerServer = new THsHaServer(targs); @@ -101,17 +96,14 @@ public class Drpc implements DistributedRPC.Iface, return handlerServer; } - private THsHaServer initInvokeServer(Map conf, final Drpc service) - throws Exception { + private THsHaServer initInvokeServer(Map conf, final Drpc service) throws Exception { int port = JStormUtils.parseInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); TNonblockingServerSocket socket = new TNonblockingServerSocket(port); THsHaServer.Args targsInvoke = new THsHaServer.Args(socket); targsInvoke.workerThreads(64); targsInvoke.protocolFactory(new TBinaryProtocol.Factory()); - targsInvoke - .processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>( - service)); + targsInvoke.processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service)); THsHaServer invokeServer = new THsHaServer(targsInvoke); @@ -136,6 +128,7 @@ public class Drpc implements DistributedRPC.Iface, LOG.info("Starting Distributed RPC servers..."); new Thread(new Runnable() { + @Override public void run() { invokeServer.serve(); @@ -148,11 +141,18 @@ public class Drpc implements DistributedRPC.Iface, clearThread = new AsyncLoopThread(new ClearThread(this)); LOG.info("Successfully start clear thread"); } + private void createPid(Map conf) throws Exception { + String pidDir = StormConfig.drpcPids(conf); + + JStormServerUtils.createPid(pidDir); + } public void init() throws Exception { conf = StormConfig.read_storm_config(); LOG.info("Configuration is \n" + conf); + createPid(conf); + initClearThread(); initThrift(); @@ -188,14 +188,10 @@ public class Drpc implements DistributedRPC.Iface, } private AtomicInteger ctr = new AtomicInteger(0); - private ConcurrentHashMap<String, Semaphore> idtoSem = - new ConcurrentHashMap<String, Semaphore>(); - private ConcurrentHashMap<String, Object> idtoResult = - new ConcurrentHashMap<String, Object>(); - private ConcurrentHashMap<String, Integer> idtoStart = - new ConcurrentHashMap<String, Integer>(); - private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = - new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>(); + private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String, Semaphore>(); + private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String, Object>(); + private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String, Integer>(); + private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>(); public void cleanup(String id) { LOG.info("clean id " + id + " @ " + (System.currentTimeMillis())); @@ -206,10 +202,8 @@ public class Drpc implements DistributedRPC.Iface, } @Override - public String execute(String function, String args) - throws DRPCExecutionException, TException { - LOG.info("Received DRPC request for " + function + " " + args + " at " - + (System.currentTimeMillis())); + public String execute(String function, String args) throws DRPCExecutionException, TException { + LOG.info("Received DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis())); int idinc = this.ctr.incrementAndGet(); int maxvalue = 1000000000; int newid = idinc % maxvalue; @@ -225,19 +219,16 @@ public class Drpc implements DistributedRPC.Iface, this.idtoSem.put(strid, sem); ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(function); queue.add(req); - LOG.info("Waiting for DRPC request for " + function + " " + args - + " at " + (System.currentTimeMillis())); + LOG.info("Waiting for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis())); try { sem.acquire(); } catch (InterruptedException e) { LOG.error("acquire fail ", e); } - LOG.info("Acquired for DRPC request for " + function + " " + args - + " at " + (System.currentTimeMillis())); + LOG.info("Acquired for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis())); Object result = this.idtoResult.get(strid); - LOG.info("Returning for DRPC request for " + function + " " + args - + " at " + (System.currentTimeMillis())); + LOG.info("Returning for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis())); this.cleanup(strid); @@ -250,8 +241,7 @@ public class Drpc implements DistributedRPC.Iface, @Override public void result(String id, String result) throws TException { Semaphore sem = this.idtoSem.get(id); - LOG.info("Received result " + result + " for id " + id + " at " - + (System.currentTimeMillis())); + LOG.info("Received result " + result + " for id " + id + " at " + (System.currentTimeMillis())); if (sem != null) { this.idtoResult.put(id, result); sem.release(); @@ -265,8 +255,7 @@ public class Drpc implements DistributedRPC.Iface, ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName); DRPCRequest req = queue.poll(); if (req != null) { - LOG.info("Fetched request for " + functionName + " at " - + (System.currentTimeMillis())); + LOG.info("Fetched request for " + functionName + " at " + (System.currentTimeMillis())); return req; } else { return new DRPCRequest("", ""); @@ -277,18 +266,15 @@ public class Drpc implements DistributedRPC.Iface, @Override public void failRequest(String id) throws TException { Semaphore sem = this.idtoSem.get(id); - LOG.info("failRequest result for id " + id + " at " - + (System.currentTimeMillis())); + LOG.info("failRequest result for id " + id + " at " + (System.currentTimeMillis())); if (sem != null) { - this.idtoResult.put(id, - new DRPCExecutionException("Request failed")); + this.idtoResult.put(id, new DRPCExecutionException("Request failed")); sem.release(); } } private ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) { - ConcurrentLinkedQueue<DRPCRequest> reqQueue = - requestQueues.get(function); + ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function); if (reqQueue == null) { reqQueue = new ConcurrentLinkedQueue<DRPCRequest>(); requestQueues.put(function, reqQueue); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java index cdfa1cc..2ec7e20 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java @@ -29,14 +29,12 @@ import com.alibaba.jstorm.callback.RunnableCallback; * Event Manager, drop one event from queue, then execute the event. */ public class EventManagerImp extends RunnableCallback implements EventManager { - private static final Logger LOG = LoggerFactory - .getLogger(EventManagerImp.class); + private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class); private AtomicInteger added = new AtomicInteger(); private AtomicInteger processed = new AtomicInteger(); - private LinkedBlockingQueue<RunnableCallback> queue = - new LinkedBlockingQueue<RunnableCallback>(); + private LinkedBlockingQueue<RunnableCallback> queue = new LinkedBlockingQueue<RunnableCallback>(); private Exception e; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java index 7f5e9ef..e4bf9e2 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java @@ -30,8 +30,7 @@ public class EventManagerPusher extends RunnableCallback { private int frequence; - public EventManagerPusher(EventManager eventManager, - RunnableCallback event, int frequence) { + public EventManagerPusher(EventManager eventManager, RunnableCallback event, int frequence) { this.eventManager = eventManager; this.event = event; this.frequence = frequence; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java index d68347a..a83f07f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java @@ -24,11 +24,10 @@ import org.jboss.netty.buffer.ChannelBuffers; public enum ControlMessage { EOB_MESSAGE((short) -201), OK_RESPONSE((short) -200); - private short code; private long timeStamp; protected static int port; - + static public void setPort(int port) { ControlMessage.port = port; } @@ -62,9 +61,7 @@ public enum ControlMessage { * @throws Exception */ ChannelBuffer buffer() throws Exception { - ChannelBufferOutputStream bout = - new ChannelBufferOutputStream( - ChannelBuffers.directBuffer(encodeLength())); + ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength())); write(bout); bout.close(); return bout.buffer(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java index 172822d..45d6600 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java @@ -28,8 +28,7 @@ import org.slf4j.LoggerFactory; import backtype.storm.messaging.TaskMessage; class MessageBatch { - private static final Logger LOG = LoggerFactory - .getLogger(MessageBatch.class); + private static final Logger LOG = LoggerFactory.getLogger(MessageBatch.class); private int buffer_size; private ArrayList<Object> msgs; private int encoded_length; @@ -58,8 +57,7 @@ class MessageBatch { return; } - throw new RuntimeException("Unsuppoted object type " - + obj.getClass().getName()); + throw new RuntimeException("Unsuppoted object type " + obj.getClass().getName()); } void remove(Object obj) { @@ -89,8 +87,7 @@ class MessageBatch { * try to add a TaskMessage to a batch * * @param taskMsg - * @return false if the msg could not be added due to buffer size limit; - * true otherwise + * @return false if the msg could not be added due to buffer size limit; true otherwise */ boolean tryAdd(TaskMessage taskMsg) { if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) @@ -144,9 +141,7 @@ class MessageBatch { * create a buffer containing the encoding of this batch */ ChannelBuffer buffer() throws Exception { - ChannelBufferOutputStream bout = - new ChannelBufferOutputStream( - ChannelBuffers.directBuffer(encoded_length)); + ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length)); for (Object msg : msgs) if (msg instanceof TaskMessage) @@ -168,19 +163,16 @@ class MessageBatch { /** * write a TaskMessage into a stream * - * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload - * ... byte[] * + * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload ... byte[] * */ - private void writeTaskMessage(ChannelBufferOutputStream bout, - TaskMessage message) throws Exception { + private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception { int payload_len = 0; if (message.message() != null) payload_len = message.message().length; int task_id = message.task(); if (task_id > Short.MAX_VALUE) - throw new RuntimeException("Task ID should not exceed " - + Short.MAX_VALUE); + throw new RuntimeException("Task ID should not exceed " + Short.MAX_VALUE); bout.writeShort((short) task_id); bout.writeInt(payload_len); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java index b147092..38e7930 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java @@ -17,11 +17,17 @@ */ package com.alibaba.jstorm.message.netty; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - +import backtype.storm.messaging.TaskMessage; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.common.metric.AsmHistogram; +import com.alibaba.jstorm.common.metric.AsmMeter; +import com.alibaba.jstorm.common.metric.AsmMetric; +import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.utils.NetWorkUtils; +import com.alibaba.jstorm.utils.TimeUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; @@ -29,46 +35,40 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.common.metric.Meter; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetricDef; -import com.alibaba.jstorm.utils.NetWorkUtils; - -import backtype.storm.messaging.TaskMessage; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; public class MessageDecoder extends FrameDecoder { - private static final Logger LOG = LoggerFactory - .getLogger(MessageDecoder.class); + private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class); // here doesn't use Timer due to competition - private static Histogram timer = JStormMetrics - .registerWorkerHistogram(MetricDef.NETWORK_MSG_DECODE_TIME); - private static Meter recvSpeed = JStormMetrics - .registerWorkerMeter(MetricDef.NETTY_SRV_RECV_SPEED); - private static Map<Channel, Histogram> networkTransmitTimeMap = - new HashMap<Channel, Histogram>(); - private static Map<Channel, String> transmitNameMap = - new HashMap<Channel, String>(); + + private static AsmHistogram msgDecodeTime = (AsmHistogram) JStormMetrics.registerWorkerMetric( + MetricUtils.workerMetricName(MetricDef.NETWORK_MSG_DECODE_TIME, MetricType.HISTOGRAM), new AsmHistogram()); + private static AsmMeter recvSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric( + MetricUtils.workerMetricName(MetricDef.NETTY_SRV_RECV_SPEED, MetricType.METER), new AsmMeter()); + + private static Map<Channel, AsmHistogram> networkTransmitTimeMap = new HashMap<Channel, AsmHistogram>(); + private static Map<Channel, String> transmitNameMap = new HashMap<Channel, String>(); private boolean isServer; private String localIp; private int localPort; + private boolean enableTransitTimeMetrics; + public MessageDecoder(boolean isServer, Map conf) { this.isServer = isServer; this.localPort = ConfigExtension.getLocalWorkerPort(conf); this.localIp = NetWorkUtils.ip(); - + this.enableTransitTimeMetrics = MetricUtils.isEnableNettyMetrics(conf); } /* - * Each ControlMessage is encoded as: code (<0) ... short(2) Each - * TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload - * ... byte[] * + * Each ControlMessage is encoded as: code (<0) ... short(2) Each TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload ... byte[] * */ - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buf) throws Exception { + protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { // Make sure that we have received at least a short long available = buf.readableBytes(); // Length of control message is 10. @@ -106,21 +106,20 @@ public class MessageDecoder extends FrameDecoder { available -= 12; if (ctrl_msg == ControlMessage.EOB_MESSAGE) { + long interval = System.currentTimeMillis() - timeStamp; - if (interval > 0) { - - Histogram netTransTime = - getTransmitHistogram(channel, clientPort); - if (netTransTime != null) { - netTransTime.update(interval ); - - } + if (interval < 0) + interval = 0; + + if(enableTransitTimeMetrics) { + AsmHistogram netTransTime = getTransmitHistogram(channel, clientPort); + if (netTransTime != null) { + netTransTime.update(interval * TimeUtils.NS_PER_US); + } } - - recvSpeed.update(Double.valueOf(ControlMessage - .encodeLength())); } + recvSpeed.update(ControlMessage.encodeLength()); return ctrl_msg; } @@ -138,9 +137,7 @@ public class MessageDecoder extends FrameDecoder { // Read the length field. int length = buf.readInt(); if (length <= 0) { - LOG.info( - "Receive one message whose TaskMessage's message length is {}", - length); + LOG.info("Receive one message whose TaskMessage's message length is {}", length); return new TaskMessage(task, null); } @@ -165,72 +162,55 @@ public class MessageDecoder extends FrameDecoder { // task, length, JStormUtils.toPrintableString(rawBytes)); TaskMessage ret = new TaskMessage(task, rawBytes); - recvSpeed.update(Double.valueOf(rawBytes.length + 6)); + recvSpeed.update(rawBytes.length + 6); return ret; } finally { if (isServer) { Long endTime = System.nanoTime(); - timer.update((endTime - startTime) / 1000000.0d); + msgDecodeTime.update((endTime - startTime) / TimeUtils.NS_PER_US); } } } - public Histogram getTransmitHistogram(Channel channel, int clientPort) { - Histogram netTransTime = networkTransmitTimeMap.get(channel); + public AsmHistogram getTransmitHistogram(Channel channel, int clientPort) { + AsmHistogram netTransTime = networkTransmitTimeMap.get(channel); if (netTransTime == null) { + InetSocketAddress sockAddr = (InetSocketAddress) (channel.getRemoteAddress()); + + String nettyConnection = NettyConnection.mkString(sockAddr.getAddress().getHostAddress(), clientPort, localIp, localPort); + netTransTime = + (AsmHistogram) JStormMetrics.registerNettyMetric( + MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection), MetricType.HISTOGRAM), + new AsmHistogram()); - InetSocketAddress sockAddr = - (InetSocketAddress) (channel.getRemoteAddress()); - - String nettyConnection = - NettyConnection.mkString(sockAddr.getAddress() - .getHostAddress(), clientPort, localIp, localPort); - try { - netTransTime = - JStormMetrics.registerWorkerHistogram( - MetricDef.NETTY_SRV_MSG_TRANS_TIME, - nettyConnection); - } catch (Exception e) { - LOG.error("{}.{} has been register", - MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection); - removeTransmitHistogram(nettyConnection); - return null; - } networkTransmitTimeMap.put(channel, netTransTime); transmitNameMap.put(channel, nettyConnection); - LOG.info("Register Transmit Histogram of {}, channel {}", - nettyConnection, channel); + LOG.info("Register Transmit Histogram of {}, channel {}", nettyConnection, channel); } return netTransTime; } public static void removeTransmitHistogram(Channel channel) { - Histogram netTransTime = networkTransmitTimeMap.remove(channel); + AsmHistogram netTransTime = networkTransmitTimeMap.remove(channel); if (netTransTime != null) { - String nettyConnection = transmitNameMap.remove(channel); - - JStormMetrics.unregisterWorkerMetric( - MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection); - - LOG.info("Remove Transmit Histogram of {}, channel {}", - nettyConnection, channel); + JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection), + MetricType.HISTOGRAM)); + LOG.info("Remove Transmit Histogram of {}, channel {}", nettyConnection, channel); } } - + public static void removeTransmitHistogram(String nettyConnection) { Channel oldChannel = null; - - for (Entry<Channel, String> entry: transmitNameMap.entrySet()) { + + for (Entry<Channel, String> entry : transmitNameMap.entrySet()) { if (nettyConnection.equals(entry.getValue())) { oldChannel = entry.getKey(); } } - + removeTransmitHistogram(oldChannel); } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java index 61e9187..0a750e0 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java @@ -23,8 +23,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; public class MessageEncoder extends OneToOneEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object obj) throws Exception { + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception { if (obj instanceof ControlMessage) { return ((ControlMessage) obj).buffer(); } @@ -33,8 +32,7 @@ public class MessageEncoder extends OneToOneEncoder { return ((MessageBatch) obj).buffer(); } - throw new RuntimeException("Unsupported encoding of object of class " - + obj.getClass().getName()); + throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName()); } }
