http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java index 8a55ec5..9b86458 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java @@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils; /** * - * tuple sending object, which get which task should tuple be send to, and - * update statics + * tuple sending object, which get which task should tuple be send to, and update statics * * @author yannian/Longda * @@ -58,8 +57,7 @@ public class TaskSendTargets { private boolean isDebuging = false; private String debugIdStr; - public TaskSendTargets(Map<Object, Object> _storm_conf, String _component, - Map<String, Map<String, MkGrouper>> _stream_component_grouper, + public TaskSendTargets(Map<Object, Object> _storm_conf, String _component, Map<String, Map<String, MkGrouper>> _stream_component_grouper, TopologyContext _topology_context, TaskBaseMetric _task_stats) { this.stormConf = _storm_conf; this.componentId = _component; @@ -67,17 +65,14 @@ public class TaskSendTargets { this.topologyContext = _topology_context; this.taskStats = _task_stats; - isDebuging = - JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), - false); + isDebuging = JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false); taskId = topologyContext.getThisTaskId(); debugIdStr = " Emit from " + componentId + ":" + taskId + " "; } // direct send tuple to special task - public java.util.List<Integer> get(Integer out_task_id, String stream, - List<Object> tuple) { + public List<Integer> get(Integer out_task_id, String stream, List<Object> tuple) { // in order to improve acker's speed, skip checking // String target_component = @@ -92,29 +87,26 @@ public class TaskSendTargets { // } if (isDebuging) { - LOG.info(debugIdStr + stream + " to " + out_task_id + ":" - + tuple.toString()); + LOG.info(debugIdStr + stream + " to " + out_task_id + ":" + tuple.toString()); } taskStats.send_tuple(stream, 1); - java.util.List<Integer> out_tasks = new ArrayList<Integer>(); + List<Integer> out_tasks = new ArrayList<Integer>(); out_tasks.add(out_task_id); return out_tasks; } // send tuple according to grouping - public java.util.List<Integer> get(String stream, List<Object> tuple) { - java.util.List<Integer> out_tasks = new ArrayList<Integer>(); + public List<Integer> get(String stream, List<Object> tuple) { + List<Integer> out_tasks = new ArrayList<Integer>(); // get grouper, then get which task should tuple be sent to. - Map<String, MkGrouper> componentCrouping = - streamComponentgrouper.get(stream); + Map<String, MkGrouper> componentCrouping = streamComponentgrouper.get(stream); if (componentCrouping == null) { // if the target component's parallelism is 0, don't need send to // them - LOG.debug("Failed to get Grouper of " + stream + " in " - + debugIdStr); + LOG.debug("Failed to get Grouper of " + stream + " in " + debugIdStr); return out_tasks; } @@ -123,8 +115,7 @@ public class TaskSendTargets { MkGrouper g = ee.getValue(); if (GrouperType.direct.equals(g.gettype())) { - throw new IllegalArgumentException( - "Cannot do regular emit to direct stream"); + throw new IllegalArgumentException("Cannot do regular emit to direct stream"); } out_tasks.addAll(g.grouper(tuple)); @@ -133,8 +124,7 @@ public class TaskSendTargets { if (isDebuging) { - LOG.info(debugIdStr + stream + " to " + out_tasks + ":" - + tuple.toString()); + LOG.info(debugIdStr + stream + " to " + out_tasks + ":" + tuple.toString()); } int num_out_tasks = out_tasks.size(); @@ -144,8 +134,7 @@ public class TaskSendTargets { return out_tasks; } - public void updateStreamCompGrouper( - Map<String, Map<String, MkGrouper>> streamComponentgrouper) { + public void updateStreamCompGrouper(Map<String, Map<String, MkGrouper>> streamComponentgrouper) { this.streamComponentgrouper = streamComponentgrouper; } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java index a6a3406..dc2a2bf 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java @@ -66,8 +66,7 @@ public class TupleInfo implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java index 72c4061..6b0ed2d 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java @@ -32,11 +32,9 @@ import com.alibaba.jstorm.task.TaskTransfer; */ public class UnanchoredSend { - public static void send(TopologyContext topologyContext, - TaskSendTargets taskTargets, TaskTransfer transfer_fn, - String stream, List<Object> values) { + public static void send(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) { - java.util.List<Integer> tasks = taskTargets.get(stream, values); + List<Integer> tasks = taskTargets.get(stream, values); if (tasks.size() == 0) { return; } @@ -44,8 +42,7 @@ public class UnanchoredSend { Integer taskId = topologyContext.getThisTaskId(); for (Integer task : tasks) { - TupleImplExt tup = - new TupleImplExt(topologyContext, values, taskId, stream); + TupleImplExt tup = new TupleImplExt(topologyContext, values, taskId, stream); tup.setTargetTaskId(task); transfer_fn.transfer(tup); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java index d0d70be..2475ede 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java @@ -25,4 +25,6 @@ package com.alibaba.jstorm.task.error; */ public interface ITaskReportErr { public void report(Throwable error); + + public void report(String error); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java index d460e5a..1b41037 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java @@ -20,8 +20,7 @@ package com.alibaba.jstorm.task.error; import com.alibaba.jstorm.callback.RunnableCallback; /** - * The callback will be called, when task occur error It just call - * TaskReportErrorAndDie + * The callback will be called, when task occur error It just call TaskReportErrorAndDie * * @author yannian * http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java index e7506b2..bf177f6 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java @@ -34,8 +34,7 @@ public class TaskReportError implements ITaskReportErr { private String topology_id; private int task_id; - public TaskReportError(StormClusterState _storm_cluster_state, - String _topology_id, int _task_id) { + public TaskReportError(StormClusterState _storm_cluster_state, String _topology_id, int _task_id) { this.zkCluster = _storm_cluster_state; this.topology_id = _topology_id; this.task_id = _task_id; @@ -44,16 +43,23 @@ public class TaskReportError implements ITaskReportErr { @Override public void report(Throwable error) { - LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" - + task_id + "\n", error); + LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", error); try { zkCluster.report_task_error(topology_id, task_id, error); } catch (Exception e) { // TODO Auto-generated catch block - LOG.error("Failed update error to /ZK/taskerrors/" + topology_id - + "/" + task_id + "\n", e); + LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e); } - } + @Override + public void report(String error) { + + LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + ": " + error); + try { + zkCluster.report_task_error(topology_id, task_id, error, null); + } catch (Exception e) { + LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java index 4f4eab3..e8596de 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java @@ -29,15 +29,20 @@ public class TaskReportErrorAndDie implements ITaskReportErr { private ITaskReportErr reporterror; private RunnableCallback haltfn; - public TaskReportErrorAndDie(ITaskReportErr _reporterror, - RunnableCallback _haltfn) { + public TaskReportErrorAndDie(ITaskReportErr _reporterror, RunnableCallback _haltfn) { this.reporterror = _reporterror; this.haltfn = _haltfn; } + // If throwable error was caught, a error will be reported and current task will be shutdown. @Override public void report(Throwable error) { this.reporterror.report(error); this.haltfn.run(); } + + @Override + public void report(String error) { + this.reporterror.report(error); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java index 3f4c18f..7e4495a 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java @@ -17,43 +17,40 @@ */ package com.alibaba.jstorm.task.execute; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; -import backtype.storm.serialization.KryoTupleDeserializer; +import backtype.storm.Constants; +import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.Utils; import backtype.storm.utils.WorkerClassLoader; - -import com.alibaba.jstorm.callback.AsyncLoopRunnable; -import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.common.metric.AsmGauge; import com.alibaba.jstorm.common.metric.QueueGauge; import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger; +import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger; import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger; -import com.alibaba.jstorm.metric.JStormHealthCheck; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.*; import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskBaseMetric; +import com.alibaba.jstorm.task.TaskBatchTransfer; import com.alibaba.jstorm.task.TaskStatus; import com.alibaba.jstorm.task.TaskTransfer; import com.alibaba.jstorm.task.error.ITaskReportErr; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; -import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; //import com.alibaba.jstorm.message.zeroMq.IRecvConnection; @@ -67,15 +64,17 @@ import com.lmax.disruptor.dsl.ProducerType; public class BaseExecutors extends RunnableCallback { private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class); - protected final String component_id; + protected final String topologyId; + protected final String componentId; protected final int taskId; protected final String idStr; protected Map storm_conf; - + protected final boolean isDebug; protected TopologyContext userTopologyCtx; + protected TopologyContext sysTopologyCtx; protected TaskBaseMetric task_stats; protected volatile TaskStatus taskStatus; @@ -93,74 +92,91 @@ public class BaseExecutors extends RunnableCallback { protected Task task; protected long assignmentTs; protected TaskTransfer taskTransfer; + + protected JStormMetricsReporter metricsReporter; + + protected boolean isFinishInit = false; + + protected RotatingMapTrigger rotatingMapTrigger; + protected TaskHeartbeatTrigger taskHbTrigger; // protected IntervalCheck intervalCheck = new IntervalCheck(); - public BaseExecutors(Task task, TaskTransfer _transfer_fn, Map _storm_conf, - Map<Integer, DisruptorQueue> innerTaskTransfer, - TopologyContext topology_context, TopologyContext _user_context, - TaskBaseMetric _task_stats, TaskStatus taskStatus, - ITaskReportErr _report_error) { + public BaseExecutors(Task task) { this.task = task; - this.storm_conf = _storm_conf; - - this.userTopologyCtx = _user_context; - this.task_stats = _task_stats; - this.taskId = topology_context.getThisTaskId(); - this.innerTaskTransfer = innerTaskTransfer; - this.component_id = topology_context.getThisComponentId(); - this.idStr = JStormServerUtils.getName(component_id, taskId); - - this.taskStatus = taskStatus; - this.report_error = _report_error; - - this.isDebug = - JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), - false); - - message_timeout_secs = - JStormUtils.parseInt( - storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), - 30); - - int queue_size = - Utils.getInt(storm_conf - .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf); - this.exeQueue = - DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, - queue_size, waitStrategy); + this.storm_conf = task.getStormConf(); + + this.userTopologyCtx = task.getUserContext(); + this.sysTopologyCtx = task.getTopologyContext(); + this.task_stats = task.getTaskStats(); + this.taskId = sysTopologyCtx.getThisTaskId(); + this.innerTaskTransfer = task.getInnerTaskTransfer(); + this.topologyId = sysTopologyCtx.getTopologyId(); + this.componentId = sysTopologyCtx.getThisComponentId(); + this.idStr = JStormServerUtils.getName(componentId, taskId); + + this.taskStatus = task.getTaskStatus(); + this.report_error = task.getReportErrorDie(); + this.taskTransfer = task.getTaskTransfer(); + this.metricsReporter = task.getWorkerData().getMetricsReporter(); + + this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false); + + message_timeout_secs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30); + + int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); + WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf); + this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size, waitStrategy); this.exeQueue.consumerStarted(); - this.controlQueue = new LinkedBlockingDeque<Object>(8); + this.controlQueue = new LinkedBlockingDeque<Object>(); this.registerInnerTransfer(exeQueue); - QueueGauge exeQueueGauge = - new QueueGauge(idStr + MetricDef.EXECUTE_QUEUE, exeQueue); - JStormMetrics.registerTaskGauge(exeQueueGauge, taskId, - MetricDef.EXECUTE_QUEUE); - JStormHealthCheck.registerTaskHealthCheck(taskId, - MetricDef.EXECUTE_QUEUE, exeQueueGauge); + QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE); + JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge( + exeQueueGauge)); + JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge); - RotatingMapTrigger rotatingMapTrigger = - new RotatingMapTrigger(storm_conf, idStr + "_rotating", - exeQueue); + rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", exeQueue); rotatingMapTrigger.register(); - TaskHeartbeatTrigger taskHbTrigger = - new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", - exeQueue, controlQueue, taskId); + taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", exeQueue, controlQueue, taskId, componentId, sysTopologyCtx, report_error); taskHbTrigger.register(); - + assignmentTs = System.currentTimeMillis(); - this.taskTransfer = _transfer_fn; + } + + public void init() throws Exception { + // this function will be override by SpoutExecutor or BoltExecutor + throw new RuntimeException("Should implement this function"); + } + + public void initWrapper() { + try { + LOG.info("{} begin to init", idStr); + + init(); + + if (taskId == getMinTaskIdOfWorker()) { + metricsReporter.setOutputCollector(getOutputCollector()); + } + + isFinishInit = true; + } catch (Throwable e) { + error = e; + LOG.error("Init error ", e); + report_error.report(e); + } finally { + + LOG.info("{} initialization finished", idStr); + + } } @Override public void preRun() { - WorkerClassLoader.switchThreadContext(); + WorkerClassLoader.switchThreadContext(); } @Override @@ -174,28 +190,11 @@ public class BaseExecutors extends RunnableCallback { throw new RuntimeException("Should implement this function"); } - // @Override - // public Object getResult() { - // if (taskStatus.isRun()) { - // return 0; - // } else if (taskStatus.isPause()) { - // return 0; - // } else if (taskStatus.isShutdown()) { - // this.shutdown(); - // return -1; - // } else { - // LOG.info("Unknow TaskStatus, shutdown executing thread of " + idStr); - // this.shutdown(); - // return -1; - // } - // } - @Override public Exception error() { if (error == null) { return null; } - return new Exception(error); } @@ -213,12 +212,9 @@ public class BaseExecutors extends RunnableCallback { LOG.info("Registor inner transfer for executor thread of " + idStr); DisruptorQueue existInnerTransfer = innerTaskTransfer.get(taskId); if (existInnerTransfer != null) { - LOG.info("Exist inner task transfer for executing thread of " - + idStr); + LOG.info("Exist inner task transfer for executing thread of " + idStr); if (existInnerTransfer != disruptorQueue) { - throw new RuntimeException( - "Inner task transfer must be only one in executing thread of " - + idStr); + throw new RuntimeException("Inner task transfer must be only one in executing thread of " + idStr); } } innerTaskTransfer.put(taskId, disruptorQueue); @@ -229,4 +225,12 @@ public class BaseExecutors extends RunnableCallback { innerTaskTransfer.remove(taskId); } + protected int getMinTaskIdOfWorker() { + SortedSet<Integer> tasks = new TreeSet<Integer>(sysTopologyCtx.getThisWorkerTasks()); + return tasks.first(); + } + + public Object getOutputCollector() { + return null; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java index a51d09a..c4ee4ad 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java @@ -28,16 +28,13 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.MessageId; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImplExt; - -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.common.metric.AsmHistogram; +import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskBaseMetric; import com.alibaba.jstorm.task.TaskTransfer; import com.alibaba.jstorm.task.acker.Acker; @@ -48,11 +45,18 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RotatingMap; import com.alibaba.jstorm.utils.TimeUtils; +import backtype.storm.Config; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.MessageId; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleExt; +import backtype.storm.tuple.TupleImplExt; + /** * bolt output interface, do emit/ack/fail * * @author yannian/Longda - * */ public class BoltCollector implements IOutputCollector { private static Logger LOG = LoggerFactory.getLogger(BoltCollector.class); @@ -72,62 +76,56 @@ public class BoltCollector implements IOutputCollector { private Map storm_conf; private Integer ackerNum; - private Histogram timer; + private AsmMetric timer; private Random random; - - public BoltCollector(int message_timeout_secs, ITaskReportErr report_error, - TaskSendTargets _send_fn, Map _storm_conf, - TaskTransfer _transfer_fn, TopologyContext _topology_context, - Integer task_id, RotatingMap<Tuple, Long> tuple_start_times, - TaskBaseMetric _task_stats) { - - this.rotateTime = - 1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1); - this.reportError = report_error; - this.sendTargets = _send_fn; - this.storm_conf = _storm_conf; - this.taskTransfer = _transfer_fn; - this.topologyContext = _topology_context; - this.task_id = task_id; - this.task_stats = _task_stats; - - this.pending_acks = - new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM); + + + //ITaskReportErr report_error, TaskSendTargets _send_fn, Map _storm_conf, TaskTransfer _transfer_fn, + //TopologyContext _topology_context, Integer task_id, TaskBaseMetric _task_stats + public BoltCollector(Task task, RotatingMap<Tuple, Long> tuple_start_times, int message_timeout_secs) { + + this.rotateTime = 1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1); + this.reportError = task.getReportErrorDie(); + this.sendTargets = task.getTaskSendTargets(); + this.storm_conf = task.getStormConf(); + this.taskTransfer = task.getTaskTransfer(); + this.topologyContext = task.getTopologyContext(); + this.task_id = task.getTaskId(); + this.task_stats = task.getTaskStats(); + + this.pending_acks = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM); // this.pending_acks = new TimeCacheMap<Tuple, // Long>(message_timeout_secs, // Acker.TIMEOUT_BUCKET_NUM); this.tuple_start_times = tuple_start_times; - this.ackerNum = - JStormUtils.parseInt(storm_conf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); + this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); String componentId = topologyContext.getThisComponentId(); - timer = - JStormMetrics.registerTaskHistogram(task_id, - MetricDef.COLLECTOR_EMIT_TIME); + this.timer = + JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topologyContext.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME, MetricType.HISTOGRAM), + new AsmHistogram()); random = new Random(); random.setSeed(System.currentTimeMillis()); + } @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, - List<Object> tuple) { + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { return boltEmit(streamId, anchors, tuple, null); } @Override - public void emitDirect(int taskId, String streamId, - Collection<Tuple> anchors, List<Object> tuple) { + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { boltEmit(streamId, anchors, tuple, taskId); } - private List<Integer> boltEmit(String out_stream_id, - Collection<Tuple> anchors, List<Object> values, Integer out_task_id) { - long start = System.nanoTime(); + private List<Integer> boltEmit(String out_stream_id, Collection<Tuple> anchors, List<Object> values, Integer out_task_id) { + final long start = System.nanoTime(); try { - java.util.List<Integer> out_tasks = null; + List<Integer> out_tasks; if (out_task_id != null) { out_tasks = sendTargets.get(out_task_id, out_stream_id, values); } else { @@ -146,59 +144,49 @@ public class BoltCollector implements IOutputCollector { lastRotate = now; } put_xor(pending_acks, a, edge_id); - for (Long root_id : a.getMessageId().getAnchorsToIds() - .keySet()) { + for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) { put_xor(anchors_to_ids, root_id, edge_id); } } } MessageId msgid = MessageId.makeId(anchors_to_ids); - TupleImplExt tupleExt = - new TupleImplExt(topologyContext, values, task_id, - out_stream_id, msgid); + TupleImplExt tupleExt = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid); tupleExt.setTargetTaskId(t); taskTransfer.transfer(tupleExt); - } return out_tasks; } catch (Exception e) { LOG.error("bolt emit", e); } finally { long end = System.nanoTime(); - timer.update((end - start)/1000000.0d); + timer.update((end - start) / TimeUtils.NS_PER_US); } return new ArrayList<Integer>(); } @Override public void ack(Tuple input) { - if (ackerNum > 0) { - - Long ack_val = Long.valueOf(0); + Long ack_val = 0L; Object pend_val = pending_acks.remove(input); if (pend_val != null) { ack_val = (Long) (pend_val); } - for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds() - .entrySet()) { - - UnanchoredSend.send( - topologyContext, - sendTargets, - taskTransfer, - Acker.ACKER_ACK_STREAM_ID, - JStormUtils.mk_list((Object) e.getKey(), - JStormUtils.bit_xor(e.getValue(), ack_val))); + for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) { + UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID, + JStormUtils.mk_list((Object) e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val))); } } - Long delta = tuple_time_delta(tuple_start_times, input); - if (delta != null) { - task_stats.bolt_acked_tuple(input.getSourceComponent(), - input.getSourceStreamId(), Double.valueOf(delta)); + Long startTime = (Long) tuple_start_times.remove(input); + if (startTime != null) { + Long endTime = System.nanoTime(); + long latency = (endTime - startTime)/TimeUtils.NS_PER_US; + long lifeCycle = (System.currentTimeMillis() - ((TupleExt) input).getCreationTimeStamp()) * TimeUtils.NS_PER_US; + + task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latency, lifeCycle); } } @@ -207,17 +195,12 @@ public class BoltCollector implements IOutputCollector { // if ackerNum == 0, we can just return if (ackerNum > 0) { pending_acks.remove(input); - for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds() - .entrySet()) { - UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, - Acker.ACKER_FAIL_STREAM_ID, - JStormUtils.mk_list((Object) e.getKey())); + for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) { + UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_FAIL_STREAM_ID, JStormUtils.mk_list((Object) e.getKey())); } } - task_stats.bolt_failed_tuple(input.getSourceComponent(), - input.getSourceStreamId()); - + task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId()); } @Override @@ -226,21 +209,19 @@ public class BoltCollector implements IOutputCollector { } // Utility functions, just used here - public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times, - Tuple tuple) { + public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times, Tuple tuple) { Long start_time = (Long) start_times.remove(tuple); if (start_time != null) { - return TimeUtils.time_delta_ms(start_time); + return (System.nanoTime() - start_time)/TimeUtils.NS_PER_US; } return null; } - public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key, - Long id) { + public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key, Long id) { // synchronized (pending) { Long curr = pending.get(key); if (curr == null) { - curr = Long.valueOf(0); + curr = 0L; } pending.put(key, JStormUtils.bit_xor(curr, id)); // } @@ -250,7 +231,7 @@ public class BoltCollector implements IOutputCollector { // synchronized (pending) { Long curr = pending.get(key); if (curr == null) { - curr = Long.valueOf(0); + curr = 0L; } pending.put(key, JStormUtils.bit_xor(curr, id)); // } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java index 15adbf2..5c4413e 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java @@ -17,13 +17,6 @@ */ package com.alibaba.jstorm.task.execute; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.task.IBolt; @@ -32,37 +25,48 @@ import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.BatchTuple; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleExt; +import backtype.storm.tuple.TupleImplExt; +import backtype.storm.tuple.Values; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.WorkerClassLoader; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.common.metric.AsmHistogram; +import com.alibaba.jstorm.common.metric.AsmMetric; +import com.alibaba.jstorm.daemon.worker.timer.BackpressureCheckTrigger; import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger; import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger; import com.alibaba.jstorm.daemon.worker.timer.TimerConstants; import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger; import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.metric.MetricDef; -import com.alibaba.jstorm.task.Task; -import com.alibaba.jstorm.task.TaskBaseMetric; -import com.alibaba.jstorm.task.TaskStatus; -import com.alibaba.jstorm.task.TaskTransfer; -import com.alibaba.jstorm.task.TaskBatchTransfer; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.task.*; import com.alibaba.jstorm.task.acker.Acker; +import com.alibaba.jstorm.task.backpressure.BackpressureTrigger; import com.alibaba.jstorm.task.comm.TaskSendTargets; import com.alibaba.jstorm.task.error.ITaskReportErr; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RotatingMap; import com.alibaba.jstorm.utils.TimeUtils; import com.lmax.disruptor.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + /** - * * BoltExecutor * * @author yannian/Longda - * */ public class BoltExecutors extends BaseExecutors implements EventHandler { private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class); @@ -76,89 +80,56 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { // internal outputCollector is BoltCollector private OutputCollector outputCollector; - private Histogram boltExeTimer; - - public BoltExecutors(Task task, IBolt _bolt, TaskTransfer _transfer_fn, - Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf, - TaskSendTargets _send_fn, TaskStatus taskStatus, - TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt, - TaskBaseMetric _task_stats, ITaskReportErr _report_error) { + private AsmMetric boltExeTimer; + private volatile double exeTime; - super(task, _transfer_fn, storm_conf, innerTaskTransfer, - sysTopologyCxt, userTopologyCxt, _task_stats, taskStatus, - _report_error); + private BackpressureTrigger backpressureTrigger; + private boolean isSystemBolt; - this.bolt = _bolt; + //, IBolt _bolt, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf, + //TaskSendTargets _send_fn, TaskStatus taskStatus, TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt, TaskBaseMetric _task_stats, + //ITaskReportErr _report_error, JStormMetricsReporter metricReport + public BoltExecutors(Task task) { - // create TimeCacheMap + super(task); - this.tuple_start_times = - new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM); + this.bolt = (IBolt)task.getTaskObj(); - this.ackerNum = - JStormUtils.parseInt(storm_conf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); - - // don't use TimeoutQueue for recv_tuple_queue, - // then other place should check the queue size - // TimeCacheQueue.DefaultExpiredCallback<Tuple> logExpireCb = new - // TimeCacheQueue.DefaultExpiredCallback<Tuple>( - // idStr); - // this.recv_tuple_queue = new - // TimeCacheQueue<Tuple>(message_timeout_secs, - // TimeCacheQueue.DEFAULT_NUM_BUCKETS, logExpireCb); + // create TimeCacheMap + this.tuple_start_times = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM); + this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); // create BoltCollector - IOutputCollector output_collector = - new BoltCollector(message_timeout_secs, _report_error, - _send_fn, storm_conf, _transfer_fn, sysTopologyCxt, - taskId, tuple_start_times, _task_stats); - + IOutputCollector output_collector = new BoltCollector(task, tuple_start_times, message_timeout_secs); outputCollector = new OutputCollector(output_collector); + taskHbTrigger.setBoltOutputCollector(outputCollector); - boltExeTimer = - JStormMetrics.registerTaskHistogram(taskId, - MetricDef.EXECUTE_TIME); + String metricName = MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM); + this.boltExeTimer = JStormMetrics.registerTaskMetric(metricName, new AsmHistogram()); - Object tickFrequence = - storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + Object tickFrequence = storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); if (tickFrequence != null) { Integer frequence = JStormUtils.parseInt(tickFrequence); - TickTupleTrigger tickTupleTrigger = - new TickTupleTrigger(sysTopologyCxt, frequence, idStr - + Constants.SYSTEM_TICK_STREAM_ID, exeQueue); + TickTupleTrigger tickTupleTrigger = new TickTupleTrigger(sysTopologyCtx, frequence, idStr + Constants.SYSTEM_TICK_STREAM_ID, exeQueue); tickTupleTrigger.register(); } - - if (ConfigExtension.isTaskBatchTuple(storm_conf)) { - TaskBatchFlushTrigger batchFlushTrigger = - new TaskBatchFlushTrigger(5, idStr - + Constants.SYSTEM_COMPONENT_ID, - (TaskBatchTransfer) _transfer_fn); - batchFlushTrigger.register(TimeUnit.MILLISECONDS); - } - - try { - // do prepare - WorkerClassLoader.switchThreadContext(); - - // Method method = IBolt.class.getMethod("prepare", new Class[] - // {Map.class, TopologyContext.class, - // OutputCollector.class}); - // method.invoke(bolt, new Object[] {storm_conf, userTopologyCxt, - // outputCollector}); - bolt.prepare(storm_conf, userTopologyCtx, outputCollector); - - } catch (Throwable e) { - error = e; - LOG.error("bolt prepare error ", e); - report_error.report(e); - } finally { - WorkerClassLoader.restoreThreadContext(); + + + isSystemBolt = Common.isSystemComponent(componentId); + if (isSystemBolt == false) { + backpressureTrigger = new BackpressureTrigger(task, this, storm_conf, outputCollector); + int backpressureCheckFrequence = ConfigExtension.getBackpressureCheckIntervl(storm_conf); + BackpressureCheckTrigger backpressureCheckTrigger = + new BackpressureCheckTrigger(30, backpressureCheckFrequence, idStr + " backpressure check trigger", backpressureTrigger); + backpressureCheckTrigger.register(TimeUnit.MILLISECONDS); } LOG.info("Successfully create BoltExecutors " + idStr); - + } + + @Override + public void init() { + bolt.prepare(storm_conf, userTopologyCtx, outputCollector); } @Override @@ -168,10 +139,14 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { @Override public void run() { + if (isFinishInit == false) { + initWrapper(); + } while (taskStatus.isShutdown() == false) { try { + //if (backpressureTrigger != null) + // backpressureTrigger.checkAndTrigger(); exeQueue.consumeBatchWhenAvailable(this); - processControlEvent(); } catch (Throwable e) { if (taskStatus.isShutdown() == false) { @@ -182,20 +157,19 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { } @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { - + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } long start = System.nanoTime(); - try { if (event instanceof Tuple) { + processControlEvent(); processTupleEvent((Tuple) event); } else if (event instanceof BatchTuple) { for (Tuple tuple : ((BatchTuple) event).getTuples()) { + processControlEvent(); processTupleEvent((Tuple) tuple); } } else if (event instanceof TimerTrigger.TimerEvent) { @@ -205,18 +179,21 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { } } finally { long end = System.nanoTime(); - boltExeTimer.update((end - start) / 1000000.0d); + exeTime = (end - start) / TimeUtils.NS_PER_US; + boltExeTimer.update(exeTime); } } private void processTupleEvent(Tuple tuple) { - task_stats.recv_tuple(tuple.getSourceComponent(), - tuple.getSourceStreamId()); - - tuple_start_times.put(tuple, System.currentTimeMillis()); + task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId()); + tuple_start_times.put(tuple, System.nanoTime()); try { - bolt.execute(tuple); + if (isSystemBolt == false && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) { + backpressureTrigger.handle(tuple); + } else { + bolt.execute(tuple); + } } catch (Throwable e) { error = e; LOG.error("bolt execute error ", e); @@ -226,11 +203,12 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { if (ackerNum == 0) { // only when acker is disable // get tuple process latency - Long start_time = (Long) tuple_start_times.remove(tuple); - if (start_time != null) { - Long delta = TimeUtils.time_delta_ms(start_time); - task_stats.bolt_acked_tuple(tuple.getSourceComponent(), - tuple.getSourceStreamId(), Double.valueOf(delta)); + Long startTime = (Long) tuple_start_times.remove(tuple); + if (startTime != null) { + Long endTime = System.nanoTime(); + long latency = (endTime - startTime)/TimeUtils.NS_PER_US; + long lifeCycle = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US; + task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), latency, lifeCycle); } } } @@ -244,8 +222,7 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { // only when acker is enable for (Entry<Tuple, Long> entry : timeoutMap.entrySet()) { Tuple input = entry.getKey(); - task_stats.bolt_failed_tuple(input.getSourceComponent(), - input.getSourceStreamId()); + task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId()); } } break; @@ -262,13 +239,11 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { break; } case TimerConstants.TASK_HEARTBEAT: { - Integer taskId = (Integer) event.getMsg(); - TaskHeartbeatRunable.updateTaskHbStats(taskId, task); + taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs()); break; } default: { - LOG.warn("Receive unsupported timer event, opcode=" - + event.getOpCode()); + LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode()); break; } } @@ -282,9 +257,17 @@ public class BoltExecutors extends BaseExecutors implements EventHandler { processTimerEvent((TimerTrigger.TimerEvent) event); LOG.debug("Received one event from control queue"); } else { - LOG.warn("Received unknown control event, " - + event.getClass().getName()); + LOG.warn("Received unknown control event, " + event.getClass().getName()); } } } + + public double getExecuteTime() { + return exeTime; + } + + @Override + public Object getOutputCollector() { + return outputCollector; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java index d554efc..d3bc53a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java @@ -23,10 +23,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.spout.ISpout; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleExt; import com.alibaba.jstorm.client.spout.IAckValueSpout; import com.alibaba.jstorm.task.TaskBaseMetric; import com.alibaba.jstorm.task.comm.TupleInfo; +import com.alibaba.jstorm.utils.TimeUtils; /** * The action after spout receive one ack tuple @@ -38,15 +41,15 @@ public class AckSpoutMsg implements IAckMsg { private static Logger LOG = LoggerFactory.getLogger(AckSpoutMsg.class); private ISpout spout; + private Tuple tuple; + private TupleInfo tupleInfo; private Object msgId; private String stream; - private long timeStamp; private List<Object> values; private TaskBaseMetric task_stats; private boolean isDebug = false; - public AckSpoutMsg(ISpout _spout, TupleInfo tupleInfo, - TaskBaseMetric _task_stats, boolean _isDebug) { + public AckSpoutMsg(ISpout _spout, Tuple tuple, TupleInfo tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) { this.task_stats = _task_stats; @@ -55,10 +58,11 @@ public class AckSpoutMsg implements IAckMsg { this.msgId = tupleInfo.getMessageId(); this.stream = tupleInfo.getStream(); - if (tupleInfo.getTimestamp() != 0) { - this.timeStamp = System.currentTimeMillis() - tupleInfo.getTimestamp(); - } + this.values = tupleInfo.getValues(); + + this.tuple = tuple; + this.tupleInfo = tupleInfo; } public void run() { @@ -73,7 +77,15 @@ public class AckSpoutMsg implements IAckMsg { spout.ack(msgId); } - task_stats.spout_acked_tuple(stream, timeStamp); + long latency = 0, lifeCycle = 0; + if (tupleInfo.getTimestamp() != 0) { + long endTime = System.nanoTime(); + latency = (endTime - tupleInfo.getTimestamp())/TimeUtils.NS_PER_US; + if (tuple != null && tuple instanceof TupleExt) { + lifeCycle = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US; + } + } + task_stats.spout_acked_tuple(stream, latency, lifeCycle); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java index 7b5d37b..f570a74 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java @@ -40,8 +40,7 @@ public class FailSpoutMsg implements IAckMsg { private TaskBaseMetric task_stats; private boolean isDebug = false; - public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo, - TaskBaseMetric _task_stats, boolean _isDebug) { + public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) { this.id = id; this.spout = _spout; this.tupleInfo = _tupleInfo; @@ -63,8 +62,7 @@ public class FailSpoutMsg implements IAckMsg { task_stats.spout_failed_tuple(tupleInfo.getStream()); if (isDebug) { - LOG.info("Failed message rootId: {}, messageId:{} : {}", id, - msg_id, tupleInfo.getValues().toString()); + LOG.info("Failed message rootId: {}, messageId:{} : {}", id, msg_id, tupleInfo.getValues().toString()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java index a74079f..8edd3cc 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java @@ -17,19 +17,13 @@ */ package com.alibaba.jstorm.task.execute.spout; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.task.TopologyContext; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.WorkerClassLoader; - import com.alibaba.jstorm.callback.AsyncLoopRunnable; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskBaseMetric; import com.alibaba.jstorm.task.TaskStatus; @@ -39,35 +33,36 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets; import com.alibaba.jstorm.task.comm.TupleInfo; import com.alibaba.jstorm.task.error.ITaskReportErr; import com.alibaba.jstorm.utils.RotatingMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * spout executor - * + * <p/> * All spout actions will be done here * * @author yannian/Longda - * */ public class MultipleThreadSpoutExecutors extends SpoutExecutors { - private static Logger LOG = LoggerFactory - .getLogger(MultipleThreadSpoutExecutors.class); - - public MultipleThreadSpoutExecutors(Task task, - backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn, - Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf, - TaskSendTargets sendTargets, TaskStatus taskStatus, - TopologyContext topology_context, TopologyContext _user_context, - TaskBaseMetric _task_stats, ITaskReportErr _report_error) { - super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf, - sendTargets, taskStatus, topology_context, _user_context, - _task_stats, _report_error); - - ackerRunnableThread = new AsyncLoopThread(new AckerRunnable()); - pending = - new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, - null, false); - - super.prepare(sendTargets, _transfer_fn, topology_context); + private static Logger LOG = LoggerFactory.getLogger(MultipleThreadSpoutExecutors.class); + + public MultipleThreadSpoutExecutors(Task task) { + super(task); + + ackerRunnableThread = new AsyncLoopThread(new AckerRunnable(), false, Thread.NORM_PRIORITY, false); + } + + public void mkPending() { + pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, false); + } + + @Override + public void init() throws Exception { + super.init(); + ackerRunnableThread.start(); } @Override @@ -75,11 +70,14 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors { return idStr + "-" + MultipleThreadSpoutExecutors.class.getSimpleName(); } - @Override - public void run() { + @Override + public void run() { + if (isFinishInit == false) { + initWrapper(); + } - super.nextTuple(); - } + super.nextTuple(); + } class AckerRunnable extends RunnableCallback { @@ -117,7 +115,7 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors { } } - + LOG.info("Successfully shutdown Spout's acker thread " + idStr); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java index 9e4dd21..144b041 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java @@ -17,15 +17,9 @@ */ package com.alibaba.jstorm.task.execute.spout; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.task.TopologyContext; import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.WorkerClassLoader; - +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskBaseMetric; import com.alibaba.jstorm.task.TaskStatus; @@ -35,35 +29,30 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets; import com.alibaba.jstorm.task.comm.TupleInfo; import com.alibaba.jstorm.task.error.ITaskReportErr; import com.alibaba.jstorm.utils.RotatingMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * spout executor - * + * <p/> * All spout actions will be done here * * @author yannian/Longda - * */ public class SingleThreadSpoutExecutors extends SpoutExecutors { - private static Logger LOG = LoggerFactory - .getLogger(SingleThreadSpoutExecutors.class); + private static Logger LOG = LoggerFactory.getLogger(SingleThreadSpoutExecutors.class); - public SingleThreadSpoutExecutors(Task task, - backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn, - Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf, - TaskSendTargets sendTargets, TaskStatus taskStatus, - TopologyContext topology_context, TopologyContext _user_context, - TaskBaseMetric _task_stats, ITaskReportErr _report_error) { - super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf, - sendTargets, taskStatus, topology_context, _user_context, - _task_stats, _report_error); + public SingleThreadSpoutExecutors(Task task) { + super(task); - // sending Tuple's TimeCacheMap - pending = - new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, - null, true); - - super.prepare(sendTargets, _transfer_fn, topology_context); + } + + @Override + public void mkPending() { + // sending Tuple's TimeCacheMap + pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, true); } @Override @@ -73,10 +62,14 @@ public class SingleThreadSpoutExecutors extends SpoutExecutors { @Override public void run() { + if (isFinishInit == false ) { + initWrapper(); + } + executeEvent(); super.nextTuple(); - + processControlEvent(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java index d913a9e..baf709f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java @@ -25,16 +25,12 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.MessageId; -import backtype.storm.tuple.TupleImplExt; -import backtype.storm.utils.DisruptorQueue; - -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.task.Task; import com.alibaba.jstorm.task.TaskBaseMetric; import com.alibaba.jstorm.task.TaskTransfer; import com.alibaba.jstorm.task.acker.Acker; @@ -44,12 +40,20 @@ import com.alibaba.jstorm.task.comm.UnanchoredSend; import com.alibaba.jstorm.task.error.ITaskReportErr; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.TimeOutMap; +import com.alibaba.jstorm.utils.TimeUtils; + +import backtype.storm.Config; +import backtype.storm.spout.ISpout; +import backtype.storm.spout.ISpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.MessageId; +import backtype.storm.tuple.TupleImplExt; +import backtype.storm.utils.DisruptorQueue; /** * spout collector, sending tuple through this Object * * @author yannian/Longda - * */ public class SpoutCollector implements ISpoutOutputCollector { private static Logger LOG = LoggerFactory.getLogger(SpoutCollector.class); @@ -71,64 +75,53 @@ public class SpoutCollector implements ISpoutOutputCollector { private Integer ackerNum; private boolean isDebug = false; - private Histogram emitTotalTimer; + private AsmHistogram emitTotalTimer; Random random; - public SpoutCollector(Integer task_id, backtype.storm.spout.ISpout spout, - TaskBaseMetric task_stats, TaskSendTargets sendTargets, - Map _storm_conf, TaskTransfer _transfer_fn, - TimeOutMap<Long, TupleInfo> pending, - TopologyContext topology_context, - DisruptorQueue disruptorAckerQueue, ITaskReportErr _report_error) { - this.sendTargets = sendTargets; - this.storm_conf = _storm_conf; - this.transfer_fn = _transfer_fn; + //Integer task_id, backtype.storm.spout.ISpout spout, TaskBaseMetric task_stats, TaskSendTargets sendTargets, Map _storm_conf, + //TaskTransfer _transfer_fn, TimeOutMap<Long, TupleInfo> pending, TopologyContext topology_context, DisruptorQueue disruptorAckerQueue, + //ITaskReportErr _report_error + public SpoutCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) { + this.sendTargets = task.getTaskSendTargets(); + this.storm_conf = task.getStormConf(); + this.transfer_fn = task.getTaskTransfer(); this.pending = pending; - this.topology_context = topology_context; + this.topology_context = task.getTopologyContext(); this.disruptorAckerQueue = disruptorAckerQueue; - this.task_stats = task_stats; - this.spout = spout; - this.task_id = task_id; - this.report_error = _report_error; + this.task_stats = task.getTaskStats(); + this.spout = (ISpout)task.getTaskObj(); + this.task_id = task.getTaskId(); + this.report_error = task.getReportErrorDie(); - ackerNum = - JStormUtils.parseInt(storm_conf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); - isDebug = - JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), - false); + ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); + isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false); random = new Random(); random.setSeed(System.currentTimeMillis()); String componentId = topology_context.getThisComponentId(); emitTotalTimer = - JStormMetrics.registerTaskHistogram(task_id, - MetricDef.COLLECTOR_EMIT_TIME); - + (AsmHistogram) JStormMetrics + .registerTaskMetric(MetricUtils.taskMetricName(topology_context.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME, + MetricType.HISTOGRAM), new AsmHistogram()); } @Override - public List<Integer> emit(String streamId, List<Object> tuple, - Object messageId) { + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { return sendSpoutMsg(streamId, tuple, messageId, null); } @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple, - Object messageId) { + public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { sendSpoutMsg(streamId, tuple, messageId, taskId); } - private List<Integer> sendSpoutMsg(String out_stream_id, - List<Object> values, Object message_id, Integer out_task_id) { - - long startTime = System.nanoTime(); - + private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) { + final long startTime = System.nanoTime(); try { - java.util.List<Integer> out_tasks = null; + List<Integer> out_tasks; if (out_task_id != null) { out_tasks = sendTargets.get(out_task_id, out_stream_id, values); } else { @@ -148,7 +141,7 @@ public class SpoutCollector implements ISpoutOutputCollector { // when duplicate root_id, it will miss call ack/fail Long root_id = MessageId.generateId(random); if (needAck) { - while (pending.containsKey(root_id) == true) { + while (pending.containsKey(root_id)) { root_id = MessageId.generateId(random); } } @@ -163,30 +156,23 @@ public class SpoutCollector implements ISpoutOutputCollector { msgid = MessageId.makeUnanchored(); } - TupleImplExt tp = - new TupleImplExt(topology_context, values, task_id, - out_stream_id, msgid); + TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid); tp.setTargetTaskId(t); transfer_fn.transfer(tp); - } if (needAck) { - TupleInfo info = new TupleInfo(); info.setStream(out_stream_id); info.setValues(values); info.setMessageId(message_id); - info.setTimestamp(System.currentTimeMillis()); + info.setTimestamp(System.nanoTime()); pending.putHead(root_id, info); - List<Object> ackerTuple = - JStormUtils.mk_list((Object) root_id, - JStormUtils.bit_xor_vals(ackSeq), task_id); + List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id); - UnanchoredSend.send(topology_context, sendTargets, transfer_fn, - Acker.ACKER_INIT_STREAM_ID, ackerTuple); + UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple); } else if (message_id != null) { TupleInfo info = new TupleInfo(); @@ -195,23 +181,20 @@ public class SpoutCollector implements ISpoutOutputCollector { info.setMessageId(message_id); info.setTimestamp(0); - AckSpoutMsg ack = - new AckSpoutMsg(spout, info, task_stats, isDebug); + AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug); ack.run(); - } return out_tasks; } finally { long endTime = System.nanoTime(); - emitTotalTimer.update((endTime - startTime)/1000000.0d); + emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US); } } @Override public void reportError(Throwable error) { - // TODO Auto-generated method stub report_error.report(error); }
