http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java index 05f745c..4bfbf2d 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java @@ -17,53 +17,56 @@ */ package com.alibaba.jstorm.task.execute.spout; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.Constants; -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.BatchTuple; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.WorkerClassLoader; - import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.common.metric.TimerRatio; import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger; import com.alibaba.jstorm.daemon.worker.timer.TimerConstants; import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger; import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; import com.alibaba.jstorm.task.Task; -import com.alibaba.jstorm.task.TaskBaseMetric; -import com.alibaba.jstorm.task.TaskStatus; -import com.alibaba.jstorm.task.TaskTransfer; import com.alibaba.jstorm.task.TaskBatchTransfer; +import com.alibaba.jstorm.task.TaskTransfer; import com.alibaba.jstorm.task.acker.Acker; import com.alibaba.jstorm.task.comm.TaskSendTargets; import com.alibaba.jstorm.task.comm.TupleInfo; -import com.alibaba.jstorm.task.error.ITaskReportErr; import com.alibaba.jstorm.task.execute.BaseExecutors; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RotatingMap; +import com.alibaba.jstorm.utils.TimeUtils; +import com.codahale.metrics.Gauge; import com.lmax.disruptor.EventHandler; +import backtype.storm.Config; +import backtype.storm.Constants; +import backtype.storm.spout.ISpout; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.BatchTuple; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.WorkerClassLoader; + /** * spout executor - * + * <p/> * All spout actions will be done here - * + * * @author yannian/Longda - * */ public class SpoutExecutors extends BaseExecutors implements EventHandler { private static Logger LOG = LoggerFactory.getLogger(SpoutExecutors.class); @@ -73,123 +76,107 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { protected backtype.storm.spout.ISpout spout; protected RotatingMap<Long, TupleInfo> pending; - protected ISpoutOutputCollector output_collector; + protected SpoutOutputCollector outputCollector; - protected boolean firstTime = true; + protected AsmHistogram nextTupleTimer; + protected AsmHistogram ackerTimer; + protected TimerRatio emptyCpuGauge; - protected Histogram nextTupleTimer; - protected Histogram ackerTimer; - protected TimerRatio emptyCpuCounter; + private String topologyId; + private String componentId; + private int taskId; protected AsyncLoopThread ackerRunnableThread; protected boolean isSpoutFullSleep; - public SpoutExecutors(Task task, backtype.storm.spout.ISpout _spout, - TaskTransfer _transfer_fn, - Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf, - TaskSendTargets sendTargets, TaskStatus taskStatus, - TopologyContext topology_context, TopologyContext _user_context, - TaskBaseMetric _task_stats, ITaskReportErr _report_error) { - super(task, _transfer_fn, _storm_conf, innerTaskTransfer, - topology_context, _user_context, _task_stats, taskStatus, - _report_error); + //, backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer, + //Map _storm_conf, TaskSendTargets sendTargets, TaskStatus taskStatus, TopologyContext topology_context, TopologyContext _user_context, + //TaskBaseMetric _task_stats, ITaskReportErr _report_error, JStormMetricsReporter metricReporter + public SpoutExecutors(Task task) { + super(task); + + this.spout = (ISpout)task.getTaskObj(); - this.spout = _spout; + this.max_spout_pending = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)); - this.max_spout_pending = - JStormUtils.parseInt(storm_conf - .get(Config.TOPOLOGY_MAX_SPOUT_PENDING)); + this.topologyId = sysTopologyCtx.getTopologyId(); + this.componentId = sysTopologyCtx.getThisComponentId(); + this.taskId = task.getTaskId(); this.nextTupleTimer = - JStormMetrics.registerTaskHistogram(taskId, - MetricDef.EXECUTE_TIME); + (AsmHistogram) JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM), new AsmHistogram()); this.ackerTimer = - JStormMetrics.registerTaskHistogram(taskId, - MetricDef.ACKER_TIME); + (AsmHistogram) JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.ACKER_TIME, MetricType.HISTOGRAM), new AsmHistogram()); - this.emptyCpuCounter = new TimerRatio(); - JStormMetrics.registerTaskGauge(emptyCpuCounter, taskId, - MetricDef.EMPTY_CPU_RATIO); + this.emptyCpuGauge = new TimerRatio(); + JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EMPTY_CPU_RATIO, MetricType.GAUGE), + new AsmGauge(emptyCpuGauge)); isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(storm_conf); - if (ConfigExtension.isTaskBatchTuple(storm_conf)) { - TaskBatchFlushTrigger batchFlushTrigger = - new TaskBatchFlushTrigger(5, idStr - + Constants.SYSTEM_COMPONENT_ID, - (TaskBatchTransfer) _transfer_fn); - batchFlushTrigger.register(TimeUnit.MILLISECONDS); - } - LOG.info("isSpoutFullSleep:" + isSpoutFullSleep); - - } - - public void prepare(TaskSendTargets sendTargets, TaskTransfer transferFn, - TopologyContext topologyContext) { - - JStormMetrics.registerTaskGauge( - new com.codahale.metrics.Gauge<Double>() { - + + mkPending(); + + JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.PENDING_MAP, MetricType.GAUGE), new AsmGauge( + new Gauge<Double>() { @Override public Double getValue() { return (double) pending.size(); } - - }, taskId, MetricDef.PENDING_MAP); + })); // collector, in fact it call send_spout_msg - this.output_collector = - new SpoutCollector(taskId, spout, task_stats, sendTargets, - storm_conf, transferFn, pending, topologyContext, - exeQueue, report_error); - - try { - WorkerClassLoader.switchThreadContext(); - this.spout.open(storm_conf, userTopologyCtx, - new SpoutOutputCollector(output_collector)); - } catch (Throwable e) { - error = e; - LOG.error("spout open error ", e); - report_error.report(e); - } finally { - WorkerClassLoader.restoreThreadContext(); - } + SpoutCollector collector = new SpoutCollector(task, pending, exeQueue); + this.outputCollector = new SpoutOutputCollector(collector); + taskTransfer.getBackpressureController().setOutputCollector(outputCollector); + taskHbTrigger.setSpoutOutputCollector(outputCollector); LOG.info("Successfully create SpoutExecutors " + idStr); - + } + + public void mkPending() { + // this function will be override by subclass + throw new RuntimeException("Should override this function"); } - public void nextTuple() { - if (firstTime == true) { + @Override + public void init() throws Exception { + + this.spout.open(storm_conf, userTopologyCtx, outputCollector); - int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf); + LOG.info("Successfully open SpoutExecutors " + idStr); + + int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf); - // wait other bolt is ready - JStormUtils.sleepMs(delayRun * 1000); + // wait other bolt is ready + JStormUtils.sleepMs(delayRun * 1000); - emptyCpuCounter.init(); + if (taskStatus.isRun()) { + spout.activate(); + } else { + spout.deactivate(); + } - if (taskStatus.isRun() == true) { - spout.activate(); - } else { - spout.deactivate(); - } + LOG.info(idStr + " is ready "); - firstTime = false; - LOG.info(idStr + " is ready "); - } + } - if (taskStatus.isRun() == false) { + public void nextTuple() { + + if (!taskStatus.isRun()) { JStormUtils.sleepMs(1); return; } // if don't need ack, pending map will be always empty if (max_spout_pending == null || pending.size() < max_spout_pending) { - emptyCpuCounter.stop(); + emptyCpuGauge.stop(); long start = System.nanoTime(); try { @@ -200,15 +187,13 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { report_error.report(e); } finally { long end = System.nanoTime(); - nextTupleTimer.update((end - start) / 1000000.0d); + nextTupleTimer.update((end - start) / TimeUtils.NS_PER_US); } - - return; } else { if (isSpoutFullSleep) { JStormUtils.sleepMs(1); } - emptyCpuCounter.start(); + emptyCpuGauge.start(); // just return, no sleep } } @@ -221,25 +206,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { /** * Handle acker message - * - * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long, - * boolean) + * + * @see EventHandler#onEvent(Object, long, boolean) */ @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { long start = System.nanoTime(); try { - if (event == null) { return; } - Runnable runnable = null; if (event instanceof Tuple) { + processControlEvent(); runnable = processTupleEvent((Tuple) event); } else if (event instanceof BatchTuple) { for (Tuple tuple : ((BatchTuple) event).getTuples()) { + processControlEvent(); runnable = processTupleEvent(tuple); if (runnable != null) { runnable.run(); @@ -257,8 +240,7 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { runnable = (Runnable) event; } else { - LOG.warn("Receive one unknow event-" + event.toString() + " " - + idStr); + LOG.warn("Receive one unknow event-" + event.toString() + " " + idStr); return; } @@ -272,42 +254,43 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { } } finally { long end = System.nanoTime(); - ackerTimer.update((end - start) / 1000000.0d); + ackerTimer.update((end - start) / TimeUtils.NS_PER_US); } } private Runnable processTupleEvent(Tuple event) { - Runnable runnable; + Runnable runnable = null; Tuple tuple = (Tuple) event; - Object id = tuple.getValue(0); - Object obj = pending.remove((Long) id); - - if (obj == null) { - if (isDebug) { - LOG.info("Pending map no entry:" + id); - } - runnable = null; + if (event.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) { + TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) tuple.getValueByField("ctrlEvent"); + taskTransfer.getBackpressureController().control(ctrlEvent); } else { - TupleInfo tupleInfo = (TupleInfo) obj; + Object id = tuple.getValue(0); + Object obj = pending.remove((Long) id); - String stream_id = tuple.getSourceStreamId(); + if (obj == null) { + if (isDebug) { + LOG.info("Pending map no entry:" + id); + } + runnable = null; + } else { + TupleInfo tupleInfo = (TupleInfo) obj; - if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) { + String stream_id = tuple.getSourceStreamId(); - runnable = - new AckSpoutMsg(spout, tupleInfo, task_stats, isDebug); - } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) { - runnable = - new FailSpoutMsg(id, spout, tupleInfo, task_stats, - isDebug); - } else { - LOG.warn("Receive one unknow source Tuple " + idStr); - runnable = null; + if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) { + + runnable = new AckSpoutMsg(spout, tuple, tupleInfo, task_stats, isDebug); + } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) { + runnable = new FailSpoutMsg(id, spout, tupleInfo, task_stats, isDebug); + } else { + LOG.warn("Receive one unknow source Tuple " + idStr); + runnable = null; + } } - } - task_stats.recv_tuple(tuple.getSourceComponent(), - tuple.getSourceStreamId()); + task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId()); + } return runnable; } @@ -317,28 +300,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { private void processTimerEvent(TimerTrigger.TimerEvent event) { switch (event.getOpCode()) { - case TimerConstants.ROTATING_MAP: { - Map<Long, TupleInfo> timeoutMap = pending.rotate(); - for (java.util.Map.Entry<Long, TupleInfo> entry : timeoutMap - .entrySet()) { - TupleInfo tupleInfo = entry.getValue(); - FailSpoutMsg fail = - new FailSpoutMsg(entry.getKey(), spout, - (TupleInfo) tupleInfo, task_stats, isDebug); - fail.run(); + case TimerConstants.ROTATING_MAP: { + Map<Long, TupleInfo> timeoutMap = pending.rotate(); + for (Map.Entry<Long, TupleInfo> entry : timeoutMap.entrySet()) { + TupleInfo tupleInfo = entry.getValue(); + FailSpoutMsg fail = new FailSpoutMsg(entry.getKey(), spout, (TupleInfo) tupleInfo, task_stats, isDebug); + fail.run(); + } + break; + } + case TimerConstants.TASK_HEARTBEAT: { + taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs()); + break; + } + default: { + LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode()); + break; } - break; - } - case TimerConstants.TASK_HEARTBEAT: { - Integer taskId = (Integer) event.getMsg(); - TaskHeartbeatRunable.updateTaskHbStats(taskId, task); - break; - } - default: { - LOG.warn("Receive unsupported timer event, opcode=" - + event.getOpCode()); - break; - } } } @@ -349,9 +327,12 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler { if (event instanceof TimerTrigger.TimerEvent) { processTimerEvent((TimerTrigger.TimerEvent) event); } else { - LOG.warn("Received unknown control event, " - + event.getClass().getName()); + LOG.warn("Received unknown control event, " + event.getClass().getName()); } } } + + public Object getOutputCollector() { + return outputCollector; + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java index 968831b..b64bc30 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java @@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.ExpiredCallback; import com.alibaba.jstorm.utils.JStormUtils; public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> { - private static Logger LOG = LoggerFactory - .getLogger(SpoutTimeoutCallBack.class); + private static Logger LOG = LoggerFactory.getLogger(SpoutTimeoutCallBack.class); private DisruptorQueue disruptorEventQueue; private backtype.storm.spout.ISpout spout; @@ -40,16 +39,12 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> { private TaskBaseMetric task_stats; private boolean isDebug; - public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue, - backtype.storm.spout.ISpout _spout, Map _storm_conf, - TaskBaseMetric stat) { + public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue, backtype.storm.spout.ISpout _spout, Map _storm_conf, TaskBaseMetric stat) { this.storm_conf = _storm_conf; this.disruptorEventQueue = disruptorEventQueue; this.spout = _spout; this.task_stats = stat; - this.isDebug = - JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), - false); + this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false); } /** @@ -62,9 +57,7 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> { } try { TupleInfo tupleInfo = (TupleInfo) val; - FailSpoutMsg fail = - new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo, - task_stats, isDebug); + FailSpoutMsg fail = new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo, task_stats, isDebug); disruptorEventQueue.publish(fail); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java index bb6ad9c..46eefe8 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java @@ -34,9 +34,7 @@ public class MkCustomGrouper { private int myTaskId; - public MkCustomGrouper(TopologyContext context, - CustomStreamGrouping _grouping, GlobalStreamId stream, - List<Integer> targetTask, int myTaskId) { + public MkCustomGrouper(TopologyContext context, CustomStreamGrouping _grouping, GlobalStreamId stream, List<Integer> targetTask, int myTaskId) { this.myTaskId = myTaskId; this.grouping = _grouping; this.grouping.prepare(context, stream, targetTask); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java index 3bf6518..66f2567 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java @@ -26,17 +26,16 @@ import com.alibaba.jstorm.utils.JStormUtils; /** * field grouping - * + * * @author yannian - * + * */ public class MkFieldsGrouper { private Fields out_fields; private Fields group_fields; private List<Integer> out_tasks; - public MkFieldsGrouper(Fields _out_fields, Fields _group_fields, - List<Integer> _out_tasks) { + public MkFieldsGrouper(Fields _out_fields, Fields _group_fields, List<Integer> _out_tasks) { for (Iterator<String> it = _group_fields.iterator(); it.hasNext();) { String groupField = it.next(); @@ -52,8 +51,7 @@ public class MkFieldsGrouper { } public List<Integer> grouper(List<Object> values) { - int hashcode = - this.out_fields.select(this.group_fields, values).hashCode(); + int hashcode = this.out_fields.select(this.group_fields, values).hashCode(); int group = Math.abs(hashcode % this.out_tasks.size()); return JStormUtils.mk_list(out_tasks.get(group)); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java index 5408afd..30d641c 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java @@ -17,14 +17,6 @@ */ package com.alibaba.jstorm.task.group; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.generated.GlobalStreamId; import backtype.storm.generated.Grouping; import backtype.storm.generated.JavaObject; @@ -32,11 +24,17 @@ import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.daemon.worker.WorkerData; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RandomRange; import com.alibaba.jstorm.utils.Thrift; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; /** * Grouper, get which task should be send to for one tuple @@ -66,9 +64,8 @@ public class MkGrouper { private MkLocalShuffer local_shuffer_grouper; private MkLocalFirst localFirst; - public MkGrouper(TopologyContext _topology_context, Fields _out_fields, - Grouping _thrift_grouping, List<Integer> _outTasks, - String streamId, WorkerData workerData) { + public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping, List<Integer> _outTasks, String streamId, + WorkerData workerData) { this.topology_context = _topology_context; this.out_fields = _out_fields; this.thrift_grouping = _thrift_grouping; @@ -83,8 +80,7 @@ public class MkGrouper { this.grouptype = this.parseGroupType(workerData); String id = _topology_context.getThisTaskId() + ":" + streamId; - LOG.info(id + " grouptype is " + grouptype + ", out_tasks is " - + out_tasks + ", local_tasks" + local_tasks); + LOG.info(id + " grouptype is " + grouptype + ", out_tasks is " + out_tasks + ", local_tasks" + local_tasks); } @@ -104,12 +100,10 @@ public class MkGrouper { grouperType = GrouperType.global; } else { - List<String> fields_group = - Thrift.fieldGrouping(thrift_grouping); + List<String> fields_group = Thrift.fieldGrouping(thrift_grouping); Fields fields = new Fields(fields_group); - fields_grouper = - new MkFieldsGrouper(out_fields, fields, out_tasks); + fields_grouper = new MkFieldsGrouper(out_fields, fields, out_tasks); // hashcode by fields grouperType = GrouperType.fields; @@ -132,29 +126,23 @@ public class MkGrouper { int myTaskId = topology_context.getThisTaskId(); String componentId = topology_context.getComponentId(myTaskId); GlobalStreamId stream = new GlobalStreamId(componentId, streamId); - custom_grouper = - new MkCustomGrouper(topology_context, g, stream, out_tasks, - myTaskId); + custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId); grouperType = GrouperType.custom_obj; } else if (Grouping._Fields.CUSTOM_SERIALIZED.equals(fields)) { // user custom group by serialized Object byte[] obj = thrift_grouping.get_custom_serialized(); - CustomStreamGrouping g = - (CustomStreamGrouping) Utils.javaDeserialize(obj); + CustomStreamGrouping g = (CustomStreamGrouping) Utils.javaDeserialize(obj); int myTaskId = topology_context.getThisTaskId(); String componentId = topology_context.getComponentId(myTaskId); GlobalStreamId stream = new GlobalStreamId(componentId, streamId); - custom_grouper = - new MkCustomGrouper(topology_context, g, stream, out_tasks, - myTaskId); + custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId); grouperType = GrouperType.custom_serialized; } else if (Grouping._Fields.DIRECT.equals(fields)) { // directly send to a special task grouperType = GrouperType.direct; } else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) { grouperType = GrouperType.local_or_shuffle; - local_shuffer_grouper = - new MkLocalShuffer(local_tasks, out_tasks, workerData); + local_shuffer_grouper = new MkLocalShuffer(local_tasks, out_tasks, workerData); } else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) { grouperType = GrouperType.localFirst; localFirst = new MkLocalFirst(local_tasks, out_tasks, workerData); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java index 56f9175..92fa18b 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java @@ -40,8 +40,7 @@ import com.alibaba.jstorm.utils.RandomRange; * @version */ public class MkLocalFirst extends Shuffer { - private static final Logger LOG = LoggerFactory - .getLogger(MkLocalFirst.class); + private static final Logger LOG = LoggerFactory.getLogger(MkLocalFirst.class); private List<Integer> allOutTasks = new ArrayList<Integer>(); private List<Integer> localOutTasks = new ArrayList<Integer>(); @@ -52,8 +51,7 @@ public class MkLocalFirst extends Shuffer { private WorkerData workerData; private IntervalCheck intervalCheck; - public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks, - WorkerData workerData) { + public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks, WorkerData workerData) { super(workerData); intervalCheck = new IntervalCheck(); @@ -74,7 +72,6 @@ public class MkLocalFirst extends Shuffer { if (localWorkerOutTasks.size() != 0) { isLocalWorkerAvail = true; localOutTasks.addAll(localWorkerOutTasks); - remoteOutTasks.removeAll(localWorkerOutTasks); } else { isLocalWorkerAvail = false; } @@ -93,8 +90,7 @@ public class MkLocalFirst extends Shuffer { for (i = 0; i < size; i++) { Integer taskId = outTasks.get(index); boolean taskStatus = workerData.isOutboundTaskActive(taskId); - DisruptorQueue exeQueue = - (workerData.getInnerTaskTransfer().get(taskId)); + DisruptorQueue exeQueue = (workerData.getInnerTaskTransfer().get(taskId)); float queueLoadRatio = exeQueue != null ? exeQueue.pctFull() : 0; if (taskStatus && queueLoadRatio < 1.0) break; @@ -123,7 +119,6 @@ public class MkLocalFirst extends Shuffer { } return JStormUtils.mk_list(remoteOutTasks.get(index)); } - public List<Integer> grouper(List<Object> values) { List<Integer> ret; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java index 324e1e6..c57d380 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java @@ -1,37 +1,28 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.alibaba.jstorm.task.group; import java.util.ArrayList; import java.util.List; +import java.util.Set; + +import org.apache.log4j.Logger; import com.alibaba.jstorm.daemon.worker.WorkerData; +import com.alibaba.jstorm.utils.IntervalCheck; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RandomRange; public class MkLocalShuffer extends Shuffer { + private static final Logger LOG = Logger.getLogger(MkLocalShuffer.class); private List<Integer> outTasks; private RandomRange randomrange; + private Set<Integer> lastLocalNodeTasks; + private IntervalCheck intervalCheck; + private WorkerData workerData; private boolean isLocal; public MkLocalShuffer(List<Integer> workerTasks, List<Integer> allOutTasks, - WorkerData workerData) { + WorkerData workerData) { super(workerData); List<Integer> localOutTasks = new ArrayList<Integer>(); @@ -40,6 +31,9 @@ public class MkLocalShuffer extends Shuffer { localOutTasks.add(outTask); } } + this.workerData = workerData; + intervalCheck = new IntervalCheck(); + intervalCheck.setInterval(60); if (localOutTasks.size() != 0) { this.outTasks = localOutTasks; @@ -47,13 +41,43 @@ public class MkLocalShuffer extends Shuffer { } else { this.outTasks = new ArrayList<Integer>(); this.outTasks.addAll(allOutTasks); + refreshLocalNodeTasks(); isLocal = false; } + randomrange = new RandomRange(outTasks.size()); + } + + /** + * Don't need to take care of multiple thread, One task one thread + */ + private void refreshLocalNodeTasks() { + Set<Integer> localNodeTasks = workerData.getLocalNodeTasks(); + + if (localNodeTasks == null || localNodeTasks.equals(lastLocalNodeTasks) ) { + return; + } + LOG.info("Old localNodeTasks:" + lastLocalNodeTasks + ", new:" + + localNodeTasks); + lastLocalNodeTasks = localNodeTasks; + + List<Integer> localNodeOutTasks = new ArrayList<Integer>(); + for (Integer outTask : outTasks) { + if (localNodeTasks.contains(outTask)) { + localNodeOutTasks.add(outTask); + } + } + + if (localNodeOutTasks.isEmpty() == false) { + this.outTasks = localNodeOutTasks; + } randomrange = new RandomRange(outTasks.size()); } public List<Integer> grouper(List<Object> values) { + if (!isLocal && intervalCheck.check()) { + refreshLocalNodeTasks(); + } int index = getActiveTask(randomrange, outTasks); // If none active tasks were found, still send message to a task if (index == -1) http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java index acad674..3d272bf 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java @@ -38,8 +38,7 @@ public abstract class Shuffer { int i = 0; for (i = 0; i < size; i++) { - if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks - .get(index)))) + if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks.get(index)))) break; else index = randomrange.nextInt(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java deleted file mode 100755 index 532f553..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.task.heartbeat; - -import java.io.Serializable; - -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; - -/** - * Task heartbeat, this Object will be updated to ZK timely - * - * @author yannian - * - */ -public class TaskHeartbeat implements Serializable { - - private static final long serialVersionUID = -6369195955255963810L; - private Integer timeSecs; - private Integer uptimeSecs; - - public TaskHeartbeat(int timeSecs, int uptimeSecs) { - this.timeSecs = timeSecs; - this.uptimeSecs = uptimeSecs; - } - - public int getTimeSecs() { - return timeSecs; - } - - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); - } - - public void setTimeSecs(int timeSecs) { - this.timeSecs = timeSecs; - } - - public int getUptimeSecs() { - return uptimeSecs; - } - - public void setUptimeSecs(int uptimeSecs) { - this.uptimeSecs = uptimeSecs; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = - prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode()); - result = - prime * result - + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TaskHeartbeat other = (TaskHeartbeat) obj; - if (timeSecs == null) { - if (other.timeSecs != null) - return false; - } else if (!timeSecs.equals(other.timeSecs)) - return false; - if (uptimeSecs == null) { - if (other.uptimeSecs != null) - return false; - } else if (!uptimeSecs.equals(other.uptimeSecs)) - return false; - return true; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java deleted file mode 100644 index be66911..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.task.heartbeat; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; - -import com.alibaba.jstorm.callback.RunnableCallback; -import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.daemon.worker.WorkerData; -import com.alibaba.jstorm.schedule.Assignment.AssignmentType; -import com.alibaba.jstorm.task.Task; -import com.alibaba.jstorm.task.UptimeComputer; -import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.TimeUtils; - -/** - * Task hearbeat - * - * @author yannian - * - */ -public class TaskHeartbeatRunable extends RunnableCallback { - private static final Logger LOG = LoggerFactory - .getLogger(TaskHeartbeatRunable.class); - - private StormClusterState zkCluster; - private String topology_id; - private UptimeComputer uptime; - private Map storm_conf; - private Integer frequence; - private Map<Integer, Long> taskAssignTsMap = new HashMap<Integer, Long>(); - - private static Map<Integer, TaskStats> taskStatsMap = - new HashMap<Integer, TaskStats>(); - private static LinkedBlockingDeque<Event> eventQueue = - new LinkedBlockingDeque<TaskHeartbeatRunable.Event>(); - - public static void registerTaskStats(int taskId, TaskStats taskStats) { - Event event = new Event(Event.REGISTER_TYPE, taskId, taskStats); - eventQueue.offer(event); - } - - public static void unregisterTaskStats(int taskId) { - Event event = new Event(Event.UNREGISTER_TYPE, taskId, null); - eventQueue.offer(event); - } - - public static void updateTaskHbStats(int taskId, Task taskData) { - Event event = new Event(Event.TASK_HEARTBEAT_TYPE, taskId, taskData); - eventQueue.offer(event); - } - - public TaskHeartbeatRunable(WorkerData workerData) { - - this.zkCluster = workerData.getZkCluster(); - this.topology_id = workerData.getTopologyId(); - this.uptime = new UptimeComputer(); - this.storm_conf = workerData.getStormConf(); - - String key = Config.TASK_HEARTBEAT_FREQUENCY_SECS; - Object time = storm_conf.get(key); - frequence = JStormUtils.parseInt(time, 10); - - } - - public void handle() throws InterruptedException { - Event event = eventQueue.take(); - while (event != null) { - switch (event.getType()) { - case Event.TASK_HEARTBEAT_TYPE: { - updateTaskHbStats(event); - break; - } - case Event.REGISTER_TYPE: { - Event<TaskStats> regEvent = event; - taskStatsMap.put(event.getTaskId(), regEvent.getEventValue()); - taskAssignTsMap.put(event.getTaskId(), - System.currentTimeMillis()); - break; - } - case Event.UNREGISTER_TYPE: { - taskStatsMap.remove(event.getTaskId()); - taskAssignTsMap.remove(event.getTaskId()); - break; - } - default: { - LOG.warn("Unknown event type received:" + event.getType()); - break; - } - } - - event = eventQueue.take(); - } - } - - @Override - public void run() { - try { - handle(); - } catch (InterruptedException e) { - LOG.info(e.getMessage()); - } - } - - @Override - public Object getResult() { - return frequence; - } - - public void updateTaskHbStats(Event event) { - Integer currtime = TimeUtils.current_time_secs(); - Event<Task> taskHbEvent = event; - int taskId = taskHbEvent.getTaskId(); - String idStr = " " + topology_id + ":" + taskId + " "; - - try { - - TaskHeartbeat hb = new TaskHeartbeat(currtime, uptime.uptime()); - zkCluster.task_heartbeat(topology_id, taskId, hb); - - LOG.info("update task hearbeat ts " + currtime + " for" + idStr); - - // Check if assignment is changed. e.g scale-out - Task task = taskHbEvent.getEventValue(); - Long timeStamp = taskAssignTsMap.get(taskId); - if (timeStamp != null) { - if (timeStamp < task.getWorkerAssignmentTs() && - task.getWorkerAssignmentType().equals(AssignmentType.Assign)) { - LOG.info("Start to update the task data for task-" + taskId); - task.updateTaskData(); - taskAssignTsMap.put(taskId, task.getWorkerAssignmentTs()); - } - } - } catch (Exception e) { - // TODO Auto-generated catch block - String errMsg = "Failed to update heartbeat to ZK " + idStr + "\n"; - LOG.error(errMsg, e); - } - } - - private static class Event<T> { - public static final int REGISTER_TYPE = 0; - public static final int UNREGISTER_TYPE = 1; - public static final int TASK_HEARTBEAT_TYPE = 2; - private final int type; - private final int taskId; - private final T value; - - public Event(int type, int taskId, T value) { - this.type = type; - this.taskId = taskId; - this.value = value; - } - - public int getType() { - return type; - } - - public int getTaskId() { - return taskId; - } - - public T getEventValue() { - return value; - } - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java new file mode 100644 index 0000000..86d72f4 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.heartbeat; + +import backtype.storm.generated.TaskHeartbeat; +import backtype.storm.generated.TopologyTaskHbInfo; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.NimbusClient; + +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.task.UptimeComputer; +import com.alibaba.jstorm.utils.TimeUtils; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Update the task heartbeat information of topology to Nimbus + * + * @author Basti Liu + * + */ +public class TaskHeartbeatUpdater{ + private static final Logger LOG = LoggerFactory + .getLogger(TaskHeartbeatUpdater.class); + + private int MAX_NUM_TASK_HB_SEND; + + private String topologyId; + private int taskId; + + private Map conf; + private NimbusClient client; + + private Map<Integer, TaskHeartbeat> taskHbMap; + private TopologyTaskHbInfo taskHbs; + + private StormClusterState zkCluster; + + public TaskHeartbeatUpdater(Map conf, String topologyId, int taskId, StormClusterState zkCluster) { + this.topologyId = topologyId; + this.taskId = taskId; + + this.conf = conf; + this.client = NimbusClient.getConfiguredClient(conf); + + this.zkCluster = zkCluster; + + try { + TopologyTaskHbInfo taskHbInfo = zkCluster.topology_heartbeat(topologyId); + if (taskHbInfo != null) { + LOG.info("Found task heartbeat info left in zk for " + topologyId + ": " + taskHbInfo.toString()); + this.taskHbs = taskHbInfo; + this.taskHbMap = taskHbInfo.get_taskHbs(); + if (this.taskHbMap == null) { + this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>(); + taskHbs.set_taskHbs(this.taskHbMap); + } + this.taskHbs.set_topologyId(topologyId); + this.taskHbs.set_topologyMasterId(this.taskId); + } else { + LOG.info("There is not any previous task heartbeat info left in zk for " + topologyId); + this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>(); + this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId); + this.taskHbs.set_taskHbs(taskHbMap); + } + } catch (Exception e) { + LOG.warn("Failed to get topology heartbeat from zk", e); + } + this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(conf); + } + + public void process(Tuple input) { + int sourceTask = input.getSourceTask(); + int uptime = (Integer) input.getValue(0); + + // Update the heartbeat for source task + TaskHeartbeat taskHb = taskHbMap.get(sourceTask); + if (taskHb == null) { + taskHb = new TaskHeartbeat(TimeUtils.current_time_secs(), uptime); + taskHbMap.put(sourceTask, taskHb); + } else { + taskHb.set_time(TimeUtils.current_time_secs()); + taskHb.set_uptime(uptime); + } + + // Send heartbeat info of all tasks to nimbus + if (sourceTask == taskId) { + // Send heartbeat info of MAX_NUM_TASK_HB_SEND tasks each time + TopologyTaskHbInfo tmpTaskHbInfo = new TopologyTaskHbInfo(topologyId, taskId); + Map<Integer, TaskHeartbeat> tmpTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>(); + tmpTaskHbInfo.set_taskHbs(tmpTaskHbMap); + + int sendCount = 0; + for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) { + tmpTaskHbMap.put(entry.getKey(), entry.getValue()); + sendCount++; + + if (sendCount >= MAX_NUM_TASK_HB_SEND) { + setTaskHeatbeat(tmpTaskHbInfo); + tmpTaskHbMap.clear(); + sendCount = 0; + } + } + if (tmpTaskHbMap.size() > 0) { + setTaskHeatbeat(tmpTaskHbInfo); + } + } + } + + private void setTaskHeatbeat(TopologyTaskHbInfo topologyTaskHbInfo) { + try { + if (topologyTaskHbInfo == null) { + return; + } + if (topologyTaskHbInfo.get_taskHbs() == null) { + return; + } + + client.getClient().updateTaskHeartbeat(topologyTaskHbInfo); + + String info = ""; + for (Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) { + info += " " + entry.getKey() + "-" + entry.getValue().get_time(); + } + LOG.info("Update task heartbeat:" + info); + } catch (TException e) { + LOG.error("Failed to update task heartbeat info", e); + if (client != null) { + client.close(); + client = NimbusClient.getConfiguredClient(conf); + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java new file mode 100644 index 0000000..adc8dc0 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.master; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Definition of control event which is used for the control purpose in + * topology, e.g. back pressure + * + * @author Basti Liu + */ + +public class TopoMasterCtrlEvent implements Serializable { + + private static final long serialVersionUID = 5929540385279089750L; + + public enum EventType { + startBackpressure, stopBackpressure, syncBackpressureState, updateBackpressureConfig, defaultType + } + + private EventType eventType; + private List<Object> eventValue; + + public TopoMasterCtrlEvent() { + eventType = EventType.defaultType; + eventValue = null; + } + + public TopoMasterCtrlEvent(EventType type, List<Object> value) { + this.eventType = type; + this.eventValue = value; + } + + public EventType getEventType() { + return eventType; + } + + public void setEventType(EventType type) { + this.eventType = type; + } + + public List<Object> getEventValue() { + return eventValue; + } + + public void setEventValue(List<Object> value) { + this.eventValue = value; + } + + public void addEventValue(Object value) { + if (eventValue == null) { + eventValue = new ArrayList<Object>(); + } + + eventValue.add(value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java new file mode 100644 index 0000000..b5fb22c --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.master; + +import backtype.storm.generated.*; +import backtype.storm.task.IBolt; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IDynamicComponent; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.NimbusClient; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.metric.MetaType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.metric.TopologyMetricContext; +import com.alibaba.jstorm.schedule.Assignment; +import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; +import com.alibaba.jstorm.task.backpressure.BackpressureCoordinator; +import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatUpdater; +import com.alibaba.jstorm.utils.IntervalCheck; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import com.google.common.collect.Maps; +import org.slf4j.Logger; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Topology master is responsible for the process of general topology + * information, e.g. task heartbeat update, metrics data update.... + * + * @author Basti Liu + */ +public class TopologyMaster implements IBolt, IDynamicComponent { + + private static final long serialVersionUID = 4690656768333833626L; + + private static final Logger LOG = getLogger(TopologyMaster.class); + private final Logger metricLogger = getLogger(TopologyMetricContext.class); + + public static final int MAX_BATCH_SIZE = 10000; + private final MetricInfo dummy = MetricUtils.mkMetricInfo(); + + public static final String FIELD_METRIC_WORKER = "worker"; + public static final String FIELD_METRIC_METRICS = "metrics"; + public static final String FILED_HEARBEAT_EVENT = "hbEvent"; + public static final String FILED_CTRL_EVENT = "ctrlEvent"; + + private Map conf; + private StormClusterState zkCluster; + private OutputCollector collector; + + private int taskId; + private String topologyId; + private volatile Set<ResourceWorkerSlot> workerSet; + private IntervalCheck intervalCheck; + + private TaskHeartbeatUpdater taskHeartbeatUpdater; + + private BackpressureCoordinator backpressureCoordinator; + + private TopologyMetricContext topologyMetricContext; + + private ScheduledExecutorService uploadMetricsExecutor; + + private Thread updateThread; + private BlockingQueue<Tuple> queue = new LinkedBlockingDeque<Tuple>(); + private IntervalCheck threadAliveCheck; + + private volatile boolean isActive = true; + + private class TopologyMasterRunnable implements Runnable { + @Override + public void run() { + while (isActive) { + try { + Tuple event = queue.take(); + if (event != null) { + eventHandle(event); + } + } catch (Throwable e) { + LOG.error("Failed to process event", e); + } + } + } + + } + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + this.conf = context.getStormConf(); + this.collector = collector; + this.taskId = context.getThisTaskId(); + this.topologyId = context.getTopologyId(); + this.zkCluster = context.getZkCluster(); + + try { + Assignment assignment = zkCluster.assignment_info(topologyId, null); + this.workerSet = assignment.getWorkers(); + intervalCheck = new IntervalCheck(); + intervalCheck.setInterval(10); + intervalCheck.start(); + } catch (Exception e) { + LOG.error("Failed to get assignment for " + topologyId); + } + + this.taskHeartbeatUpdater = new TaskHeartbeatUpdater(this.conf, topologyId, taskId, zkCluster); + + this.backpressureCoordinator = new BackpressureCoordinator(collector, context, taskId); + + this.topologyMetricContext = new TopologyMetricContext(topologyId, this.workerSet, this.conf); + + this.uploadMetricsExecutor = Executors.newSingleThreadScheduledExecutor(); + this.uploadMetricsExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + int secOffset = TimeUtils.secOffset(); + int offset = 35; + if (secOffset < offset) { + JStormUtils.sleepMs((offset - secOffset) * 1000); + } else if (secOffset == offset) { + // do nothing + } else { + JStormUtils.sleepMs((60 - secOffset + offset) * 1000); + } + if (topologyMetricContext.getUploadedWorkerNum() > 0) { + metricLogger.info("force upload metrics."); + mergeAndUpload(); + } + } + }, 5, 60, TimeUnit.SECONDS); + + updateThread = new Thread(new TopologyMasterRunnable()); + updateThread.start(); + + threadAliveCheck = new IntervalCheck(); + threadAliveCheck.setInterval(30); + threadAliveCheck.start(); + } + + @Override + public void execute(Tuple input) { + if (input != null) { + + try { + queue.put(input); + } catch (InterruptedException e) { + LOG.error("Failed to put event to taskHb updater's queue", e); + } + + if (threadAliveCheck.check()) { + if (updateThread == null || updateThread.isAlive() == false) { + updateThread = new Thread(new TopologyMasterRunnable()); + updateThread.start(); + } + } + + collector.ack(input); + } else { + LOG.error("Received null tuple!"); + } + } + + @Override + public void cleanup() { + isActive = false; + LOG.info("Successfully cleanup"); + } + + private void updateTopologyWorkerSet() { + if (intervalCheck.check()) { + Assignment assignment; + try { + assignment = zkCluster.assignment_info(topologyId, null); + this.workerSet = assignment.getWorkers(); + } catch (Exception e) { + LOG.error("Failed to get assignment for " + topologyId); + } + + } + } + + private void eventHandle(Tuple input) { + updateTopologyWorkerSet(); + + String stream = input.getSourceStreamId(); + + try { + if (stream.equals(Common.TOPOLOGY_MASTER_HB_STREAM_ID)) { + taskHeartbeatUpdater.process(input); + } else if (stream.equals(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID)) { + updateMetrics(input); + } else if (stream.equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) { + backpressureCoordinator.process(input); + } + } catch (Exception e) { + LOG.error("Failed to handle event: " + input.toString(), e); + } + } + + @Override + public void update(Map conf) { + LOG.info("Topology master received new conf:" + conf); + + if (backpressureCoordinator.isBackpressureConfigChange(conf)) { + backpressureCoordinator.updateBackpressureConfig(conf); + } + } + + private void updateMetrics(Tuple input) { + String workerSlot = (String) input.getValueByField(FIELD_METRIC_WORKER); + WorkerUploadMetrics metrics = (WorkerUploadMetrics) input.getValueByField(FIELD_METRIC_METRICS); + topologyMetricContext.addToMemCache(workerSlot, metrics.get_allMetrics()); + metricLogger.info("received metrics from:{}, size:{}", workerSlot, metrics.get_allMetrics().get_metrics_size()); + + if (topologyMetricContext.readyToUpload()) { + metricLogger.info("all {} worker slots have updated metrics, start merging & uploading...", + topologyMetricContext.getWorkerNum()); + uploadMetricsExecutor.submit(new Runnable() { + @Override + public void run() { + mergeAndUpload(); + } + }); + } + } + + private void mergeAndUpload() { + // double check + if (topologyMetricContext.getUploadedWorkerNum() > 0) { + TopologyMetric tpMetric = topologyMetricContext.mergeMetrics(); + if (tpMetric != null) { + uploadMetrics(tpMetric); + } + + topologyMetricContext.resetUploadedMetrics(); + //MetricUtils.logMetrics(tpMetric.get_componentMetric()); + } + } + + /** + * upload metrics sequentially due to thrift frame size limit (15MB) + */ + private void uploadMetrics(TopologyMetric tpMetric) { + long start = System.currentTimeMillis(); + if (StormConfig.local_mode(conf)) { + return; + } else { + NimbusClient client = null; + try { + client = NimbusClient.getConfiguredClient(conf); + Nimbus.Client client1 = client.getClient(); + + MetricInfo topologyMetrics = tpMetric.get_topologyMetric(); + MetricInfo componentMetrics = tpMetric.get_componentMetric(); + MetricInfo taskMetrics = tpMetric.get_taskMetric(); + MetricInfo streamMetrics = tpMetric.get_streamMetric(); + MetricInfo workerMetrics = tpMetric.get_workerMetric(); + MetricInfo nettyMetrics = tpMetric.get_nettyMetric(); + + int totalSize = topologyMetrics.get_metrics_size() + componentMetrics.get_metrics_size() + + taskMetrics.get_metrics_size() + streamMetrics.get_metrics_size() + + workerMetrics.get_metrics_size() + nettyMetrics.get_metrics_size(); + + // for small topologies, send all metrics together to ease the pressure of nimbus + if (totalSize < MAX_BATCH_SIZE) { + client1.uploadTopologyMetrics(topologyId, + new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, + streamMetrics, nettyMetrics)); + } else { + client1.uploadTopologyMetrics(topologyId, + new TopologyMetric(topologyMetrics, componentMetrics, dummy, dummy, dummy, dummy)); + batchUploadMetrics(client1, topologyId, workerMetrics, MetaType.WORKER); + batchUploadMetrics(client1, topologyId, taskMetrics, MetaType.TASK); + batchUploadMetrics(client1, topologyId, streamMetrics, MetaType.STREAM); + batchUploadMetrics(client1, topologyId, nettyMetrics, MetaType.NETTY); + } + } catch (Exception e) { + LOG.error("Failed to upload worker metrics", e); + } finally { + if (client != null) { + client.close(); + } + } + } + metricLogger.info("upload metrics, cost:{}", System.currentTimeMillis() - start); + } + + private void batchUploadMetrics(Nimbus.Client client, String topologyId, MetricInfo metricInfo, MetaType metaType) { + if (metricInfo.get_metrics_size() > MAX_BATCH_SIZE) { + Map<String, Map<Integer, MetricSnapshot>> data = metricInfo.get_metrics(); + + Map<String, Map<Integer, MetricSnapshot>> part = Maps.newHashMapWithExpectedSize(MAX_BATCH_SIZE); + MetricInfo uploadPart = new MetricInfo(); + int i = 0; + for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : data.entrySet()) { + part.put(entry.getKey(), entry.getValue()); + if (++i >= MAX_BATCH_SIZE) { + uploadPart.set_metrics(part); + doUpload(client, topologyId, uploadPart, metaType); + + i = 0; + part.clear(); + } + } + if (part.size() > 0) { + uploadPart.set_metrics(part); + doUpload(client, topologyId, uploadPart, metaType); + } + } else { + doUpload(client, topologyId, metricInfo, metaType); + } + } + + private void doUpload(Nimbus.Client client, String topologyId, MetricInfo part, MetaType metaType) { + try { + if (metaType == MetaType.TASK) { + client.uploadTopologyMetrics(topologyId, + new TopologyMetric(dummy, dummy, dummy, part, dummy, dummy)); + } else if (metaType == MetaType.STREAM) { + client.uploadTopologyMetrics(topologyId, + new TopologyMetric(dummy, dummy, dummy, dummy, part, dummy)); + } else if (metaType == MetaType.WORKER) { + client.uploadTopologyMetrics(topologyId, + new TopologyMetric(dummy, dummy, part, dummy, dummy, dummy)); + } else if (metaType == MetaType.NETTY) { + client.uploadTopologyMetrics(topologyId, + new TopologyMetric(dummy, dummy, dummy, dummy, dummy, part)); + } + } catch (Exception ex) { + LOG.error("Error", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java index 4a4a72b..8f0138a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java @@ -1,4 +1,4 @@ -/** +package com.alibaba.jstorm.utils; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java index 17b7885..161156b 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java @@ -17,67 +17,59 @@ */ package com.alibaba.jstorm.utils; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.utils.DisruptorQueue; - import com.alibaba.jstorm.callback.AsyncLoopRunnable; import com.alibaba.jstorm.callback.RunnableCallback; -import com.alibaba.jstorm.common.metric.QueueGauge; -import com.alibaba.jstorm.common.metric.Timer; -import com.alibaba.jstorm.metric.JStormHealthCheck; +import com.alibaba.jstorm.common.metric.*; import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.metric.JStormHealthCheck; import com.alibaba.jstorm.metric.MetricDef; import com.lmax.disruptor.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; //import com.alibaba.jstorm.message.zeroMq.ISendConnection; /** - * * Disruptor Consumer thread * * @author yannian - * */ -public abstract class DisruptorRunable extends RunnableCallback implements - EventHandler { - private final static Logger LOG = LoggerFactory - .getLogger(DisruptorRunable.class); +public abstract class DisruptorRunable extends RunnableCallback implements EventHandler { + private final static Logger LOG = LoggerFactory.getLogger(DisruptorRunable.class); protected DisruptorQueue queue; protected String idStr; - protected Timer timer; + protected AsmHistogram timer; protected AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown(); public DisruptorRunable(DisruptorQueue queue, String idStr) { this.queue = queue; - this.timer = - JStormMetrics.registerWorkerTimer(idStr + MetricDef.TIME_TYPE); this.idStr = idStr; - QueueGauge queueGauge = - new QueueGauge(idStr + MetricDef.QUEUE_TYPE, queue); - JStormMetrics.registerWorkerGauge(queueGauge, idStr - + MetricDef.QUEUE_TYPE); + this.timer = + (AsmHistogram) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.TIME_TYPE, MetricType.HISTOGRAM), + new AsmHistogram()); + + QueueGauge queueGauge = new QueueGauge(queue, idStr, MetricDef.QUEUE_TYPE); + JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE), new AsmGauge(queueGauge)); JStormHealthCheck.registerWorkerHealthCheck(idStr, queueGauge); } - public abstract void handleEvent(Object event, boolean endOfBatch) - throws Exception; + public abstract void handleEvent(Object event, boolean endOfBatch) throws Exception; /** * This function need to be implements * - * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long, - * boolean) + * @see EventHandler#onEvent(Object, long, boolean) */ @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } @@ -87,7 +79,7 @@ public abstract class DisruptorRunable extends RunnableCallback implements handleEvent(event, endOfBatch); } finally { long end = System.nanoTime(); - timer.update((end - start)/1000000.0d); + timer.update((end - start) / TimeUtils.NS_PER_US); } } @@ -96,17 +88,15 @@ public abstract class DisruptorRunable extends RunnableCallback implements LOG.info("Successfully start thread " + idStr); queue.consumerStarted(); - while (shutdown.get() == false) { + while (!shutdown.get()) { queue.consumeBatchWhenAvailable(this); - } - LOG.info("Successfully exit thread " + idStr); } @Override public void shutdown() { - JStormMetrics.unregisterWorkerMetric(idStr + MetricDef.QUEUE_TYPE); + JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE)); JStormHealthCheck.unregisterWorkerHealthCheck(idStr); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java index 8c62d6f..2773963 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java @@ -17,34 +17,18 @@ */ package com.alibaba.jstorm.utils; -public enum EPlatform { - Any("any"), - Linux("Linux"), - Mac_OS("Mac OS"), - Mac_OS_X("Mac OS X"), - Windows("Windows"), - OS2("OS/2"), - Solaris("Solaris"), - SunOS("SunOS"), - MPEiX("MPE/iX"), - HP_UX("HP-UX"), - AIX("AIX"), - OS390("OS/390"), - FreeBSD("FreeBSD"), - Irix("Irix"), - Digital_Unix("Digital Unix"), - NetWare_411("NetWare"), - OSF1("OSF1"), - OpenVMS("OpenVMS"), - Others("Others"); - - private EPlatform(String desc){ - this.description = desc; - } - - public String toString(){ - return description; - } - - private String description; -} +public enum EPlatform { + Any("any"), Linux("Linux"), Mac_OS("Mac OS"), Mac_OS_X("Mac OS X"), Windows("Windows"), OS2("OS/2"), Solaris("Solaris"), SunOS("SunOS"), MPEiX("MPE/iX"), HP_UX( + "HP-UX"), AIX("AIX"), OS390("OS/390"), FreeBSD("FreeBSD"), Irix("Irix"), Digital_Unix("Digital Unix"), NetWare_411("NetWare"), OSF1("OSF1"), OpenVMS( + "OpenVMS"), Others("Others"); + + private EPlatform(String desc) { + this.description = desc; + } + + public String toString() { + return description; + } + + private String description; +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java index e33167a..7099169 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java @@ -77,8 +77,7 @@ public class FileAttribute implements Serializable, JSONAware { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } @Override @@ -122,8 +121,7 @@ public class FileAttribute implements Serializable, JSONAware { String jsonString = JStormUtils.to_json(map); - Map<String, Map> map2 = - (Map<String, Map>) JStormUtils.from_json(jsonString); + Map<String, Map> map2 = (Map<String, Map>) JStormUtils.from_json(jsonString); Map jObject = map2.get("test"); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java index 20c1f7a..378ee26 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java @@ -29,8 +29,7 @@ public class HttpserverUtils { public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK = "jstack"; - public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = - "showConf"; + public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = "showConf"; public static final String HTTPSERVER_LOGVIEW_PARAM_LOGFILE = "log"; @@ -38,8 +37,7 @@ public class HttpserverUtils { public static final String HTTPSERVER_LOGVIEW_PARAM_DIR = "dir"; - public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT = - "workerPort"; + public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT = "workerPort"; public static final String HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT = "%016d\n"; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java index 992659c..de7b504 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java @@ -1,83 +1,74 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.utils; - -import java.io.Serializable; - -public class IntervalCheck implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 8952971673547362883L; - - long lastCheck = System.currentTimeMillis(); - - // default interval is 1 second - long interval = 1000; - - /* - * if last check time is before interval seconds, return true, otherwise - * return false - */ - public boolean check() { - return checkAndGet() != null; - } - - /** - * - * @return - */ - public Double checkAndGet() { - long now = System.currentTimeMillis(); - - synchronized (this) { - if (now >= interval + lastCheck) { - double pastSecond = ((double) (now - lastCheck)) / 1000; - lastCheck = now; - return pastSecond; - } - } - - return null; - } - - public long getInterval() { - return interval/1000; - } - - public long getIntervalMs() { - return interval; - } - - public void setInterval(long interval) { - this.interval = interval * 1000; - } - - public void setIntervalMs(long interval) { - this.interval = interval; - } - - public void adjust(long addTimeMillis) { - lastCheck += addTimeMillis; - } - - public void start() { - lastCheck = System.currentTimeMillis(); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.utils; + +import java.io.Serializable; + +public class IntervalCheck implements Serializable { + private static final long serialVersionUID = 8952971673547362883L; + + long lastCheck = System.currentTimeMillis(); + + // default interval is 1 second + long interval = 1000; + + /* + * if last check time is before interval seconds, return true, otherwise return false + */ + public boolean check() { + return checkAndGet() != null; + } + + public Double checkAndGet() { + long now = System.currentTimeMillis(); + + synchronized (this) { + if (now >= interval + lastCheck) { + double pastSecond = ((double) (now - lastCheck)) / 1000; + lastCheck = now; + return pastSecond; + } + } + + return null; + } + + public long getInterval() { + return interval / 1000; + } + + public long getIntervalMs() { + return interval; + } + + public void setInterval(long interval) { + this.interval = interval * 1000; + } + + public void setIntervalMs(long interval) { + this.interval = interval; + } + + public void adjust(long addTimeMillis) { + lastCheck += addTimeMillis; + } + + public void start() { + lastCheck = System.currentTimeMillis(); + } +}
