http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java index db2e990..7ef7144 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.task.TaskReceiver.DeserializeRunnable; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.BatchTuple; @@ -32,27 +33,20 @@ import backtype.storm.tuple.Tuple; import backtype.storm.utils.DisruptorQueue; public class TaskBatchReceiver extends TaskReceiver { - private static Logger LOG = LoggerFactory - .getLogger(TaskBatchReceiver.class); + private static Logger LOG = LoggerFactory.getLogger(TaskBatchReceiver.class); - public TaskBatchReceiver(Task task, int taskId, Map stormConf, - TopologyContext topologyContext, - Map<Integer, DisruptorQueue> innerTaskTransfer, + public TaskBatchReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer, TaskStatus taskStatus, String taskName) { - super(task, taskId, stormConf, topologyContext, innerTaskTransfer, - taskStatus, taskName); + super(task, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName); } @Override protected void setDeserializeThread() { - this.deserializeThread = - new AsyncLoopThread(new DeserializeBatchRunnable( - deserializeQueue, innerTaskTransfer.get(taskId))); + this.deserializeThread = new AsyncLoopThread(new DeserializeBatchRunnable(deserializeQueue, innerTaskTransfer.get(taskId))); } public class DeserializeBatchRunnable extends DeserializeRunnable { - public DeserializeBatchRunnable(DisruptorQueue deserializeQueue, - DisruptorQueue exeQueue) { + public DeserializeBatchRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) { super(deserializeQueue, exeQueue); } @@ -83,14 +77,11 @@ public class TaskBatchReceiver extends TaskReceiver { return tuple; } catch (Throwable e) { if (taskStatus.isShutdown() == false) { - LOG.error( - idStr + " recv thread error " - + JStormUtils.toPrintableString(ser_msg) - + "\n", e); + LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e); } } finally { long end = System.nanoTime(); - deserializeTimer.update((end - start)/1000000.0d); + deserializeTimer.update((end - start) / TimeUtils.NS_PER_US); } return null;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java index e10fe96..07d7cbb 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java @@ -20,6 +20,7 @@ package com.alibaba.jstorm.task; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,13 +28,15 @@ import org.slf4j.LoggerFactory; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.daemon.worker.WorkerData; +import com.alibaba.jstorm.daemon.worker.timer.TaskBatchCheckTrigger; +import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger; +import com.alibaba.jstorm.utils.EventSampler; +import com.alibaba.jstorm.utils.Pair; -import backtype.storm.messaging.IConnection; -import backtype.storm.messaging.TaskMessage; import backtype.storm.serialization.KryoTupleSerializer; import backtype.storm.tuple.BatchTuple; +import backtype.storm.tuple.ITupleExt; import backtype.storm.tuple.TupleExt; -import backtype.storm.utils.DisruptorQueue; /** * Batch Tuples, then send out @@ -43,62 +46,84 @@ import backtype.storm.utils.DisruptorQueue; */ public class TaskBatchTransfer extends TaskTransfer { - private static Logger LOG = LoggerFactory - .getLogger(TaskBatchTransfer.class); - + private static Logger LOG = LoggerFactory.getLogger(TaskBatchTransfer.class); + protected static final double BATCH_SIZE_THRESHOLD = 2.0; + protected static final int BATCH_FLUSH_INTERVAL_MS = 5; + protected static final int BATCH_CHECK_INTERVAL_S = 3600; + protected static final int BATCH_EVENT_SAMPLER_INTERVAL_S = 4 * 240; + private Map<Integer, BatchTuple> batchMap; + private final int maxBatchSize; private int batchSize; private Object lock = new Object(); + private EventSampler eventSampler = null; - public TaskBatchTransfer(Task task, String taskName, - KryoTupleSerializer serializer, TaskStatus taskStatus, - WorkerData workerData) { + public TaskBatchTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) { super(task, taskName, serializer, taskStatus, workerData); batchMap = new HashMap<Integer, BatchTuple>(); - batchSize = - ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf()); + maxBatchSize = ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf()); + + + TaskBatchFlushTrigger batchFlushTrigger = new TaskBatchFlushTrigger(BATCH_FLUSH_INTERVAL_MS, taskName, this); + batchFlushTrigger.register(TimeUnit.MILLISECONDS); + + TaskBatchCheckTrigger batchCheckTrigger = new TaskBatchCheckTrigger(BATCH_CHECK_INTERVAL_S, taskName, this); + batchCheckTrigger.register(); + + startCheck(); } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + LOG.info(taskName + " set batch size as " + batchSize); + } @Override protected AsyncLoopThread setupSerializeThread() { return new AsyncLoopThread(new TransferBatchRunnable()); } + + public void startCheck() { + eventSampler = new EventSampler(BATCH_EVENT_SAMPLER_INTERVAL_S); + setBatchSize(maxBatchSize); + LOG.info("Start check batch size, task of " + taskName); + } + + public void stopCheck() { + eventSampler = null; + LOG.info("Stop check batch size, task of " + taskName); + } - @Override - public void transfer(TupleExt tuple) { - int targetTaskid = tuple.getTargetTaskId(); - synchronized (lock) { - BatchTuple batch = getBatchTuple(targetTaskid); + @Override + public void push(int taskId, TupleExt tuple) { + synchronized (lock) { + BatchTuple batch = getBatchTuple(taskId); - batch.addToBatch(tuple); - if (batch.isBatchFull()) { - pushToQueue(targetTaskid, batch); - } - } - } + batch.addToBatch(tuple); + if (batch.isBatchFull()) { + serializeQueue.publish(batch); + batchMap.put(taskId, new BatchTuple(taskId, batchSize)); + } + } + + } public void flush() { + Map<Integer, BatchTuple> oldBatchMap = null; synchronized (lock) { - for (Entry<Integer, BatchTuple> entry : batchMap.entrySet()) { - int taskId = entry.getKey(); - BatchTuple batch = entry.getValue(); - if (batch != null && batch.currBatchSize() > 0) { - pushToQueue(taskId, batch); - } + oldBatchMap = batchMap; + batchMap = new HashMap<Integer, BatchTuple>(); + } + + for (Entry<Integer, BatchTuple> entry : oldBatchMap.entrySet()) { + BatchTuple batch = entry.getValue(); + if (batch != null && batch.currBatchSize() > 0) { + serializeQueue.publish(batch); } } } - private void pushToQueue(int targetTaskid, BatchTuple batch) { - DisruptorQueue exeQueue = innerTaskTransfer.get(targetTaskid); - if (exeQueue != null) { - exeQueue.publish(batch); - } else { - serializeQueue.publish(batch); - } - resetBatchTuple(targetTaskid); - } private BatchTuple getBatchTuple(int targetTaskId) { BatchTuple ret = batchMap.get(targetTaskId); @@ -109,33 +134,27 @@ public class TaskBatchTransfer extends TaskTransfer { return ret; } - private void resetBatchTuple(int targetTaskId) { - batchMap.put(targetTaskId, null); - } protected class TransferBatchRunnable extends TransferRunnable { - @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { - - if (event == null) { - return; - } - - long start = System.currentTimeMillis(); - try { - BatchTuple tuple = (BatchTuple) event; - int taskid = tuple.getTargetTaskId(); - byte[] tupleMessage = serializer.serializeBatch(tuple); - TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage); - IConnection conn = getConnection(taskid); - if (conn != null) - conn.send(taskMessage); - } finally { - long end = System.currentTimeMillis(); - timer.update(end - start); - } + public byte[] serialize(ITupleExt tuple) { + BatchTuple batchTuple = (BatchTuple)tuple; + if (eventSampler != null) { + Pair<Integer, Double> result = eventSampler.avgCheck(batchTuple.currBatchSize()); + if (result != null) { + Double avgBatchSize = result.getSecond(); + LOG.info(taskName + " batch average size is " + avgBatchSize); + if (avgBatchSize < BATCH_SIZE_THRESHOLD) { + LOG.info("Due to average size is small, so directly reset batch size as 1"); + // set the batch size as 1 + // transfer can directly send tuple, don't need wait flush interval + setBatchSize(1); + } + stopCheck(); + } + + } + return serializer.serializeBatch(batchTuple); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java index 0eb1c4b..ddfe63d 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java @@ -54,11 +54,8 @@ public class TaskInfo implements Serializable { @Override public boolean equals(Object assignment) { - if (assignment instanceof TaskInfo - && ((TaskInfo) assignment).getComponentId().equals( - getComponentId()) - && ((TaskInfo) assignment).getComponentType().equals( - componentType)) { + if (assignment instanceof TaskInfo && ((TaskInfo) assignment).getComponentId().equals(getComponentId()) + && ((TaskInfo) assignment).getComponentType().equals(componentType)) { return true; } return false; @@ -66,13 +63,11 @@ public class TaskInfo implements Serializable { @Override public int hashCode() { - return this.getComponentId().hashCode() - + this.getComponentType().hashCode(); + return this.getComponentId().hashCode() + this.getComponentType().hashCode(); } @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java index ad32ceb..230ba16 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java @@ -17,32 +17,32 @@ */ package com.alibaba.jstorm.task; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.serialization.KryoTupleDeserializer; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.Utils; import backtype.storm.utils.WorkerClassLoader; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.common.metric.QueueGauge; -import com.alibaba.jstorm.metric.JStormHealthCheck; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.*; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import com.esotericsoftware.kryo.KryoException; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + public class TaskReceiver { private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class); @@ -58,13 +58,11 @@ public class TaskReceiver { protected DisruptorQueue deserializeQueue; protected KryoTupleDeserializer deserializer; protected AsyncLoopThread deserializeThread; - protected Histogram deserializeTimer; + protected AsmHistogram deserializeTimer; protected TaskStatus taskStatus; - public TaskReceiver(Task task, int taskId, Map stormConf, - TopologyContext topologyContext, - Map<Integer, DisruptorQueue> innerTaskTransfer, + public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer, TaskStatus taskStatus, String taskName) { this.task = task; this.taskId = taskId; @@ -77,34 +75,24 @@ public class TaskReceiver { this.isDebugRecv = ConfigExtension.isTopologyDebugRecvTuple(stormConf); - int queueSize = - JStormUtils - .parseInt( - stormConf - .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), - 256); - - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils - .createDisruptorWaitStrategy(stormConf); - this.deserializeQueue = - DisruptorQueue.mkInstance("TaskDeserialize", - ProducerType.MULTI, queueSize, waitStrategy); + int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); + + WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf); + this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy); setDeserializeThread(); - this.deserializer = - new KryoTupleDeserializer(stormConf, topologyContext); + this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext); + + String topologyId = topologyContext.getTopologyId(); + String component = topologyContext.getThisComponentId(); deserializeTimer = - JStormMetrics.registerTaskHistogram(taskId, - MetricDef.DESERIALIZE_TIME); - - QueueGauge deserializeQueueGauge = - new QueueGauge(idStr + MetricDef.DESERIALIZE_QUEUE, - deserializeQueue); - JStormMetrics.registerTaskGauge(deserializeQueueGauge, taskId, - MetricDef.DESERIALIZE_QUEUE); - JStormHealthCheck.registerTaskHealthCheck(taskId, - MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge); + (AsmHistogram) JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram()); + + QueueGauge deserializeQueueGauge = new QueueGauge(deserializeQueue, idStr, MetricDef.DESERIALIZE_QUEUE); + JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE), + new AsmGauge(deserializeQueueGauge)); + JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge); } public AsyncLoopThread getDeserializeThread() { @@ -112,9 +100,7 @@ public class TaskReceiver { } protected void setDeserializeThread() { - this.deserializeThread = - new AsyncLoopThread(new DeserializeRunnable(deserializeQueue, - innerTaskTransfer.get(taskId))); + this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(deserializeQueue, innerTaskTransfer.get(taskId))); } public DisruptorQueue getDeserializeQueue() { @@ -126,8 +112,7 @@ public class TaskReceiver { DisruptorQueue deserializeQueue; DisruptorQueue exeQueue; - DeserializeRunnable(DisruptorQueue deserializeQueue, - DisruptorQueue exeQueue) { + DeserializeRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) { this.deserializeQueue = deserializeQueue; this.exeQueue = exeQueue; } @@ -162,24 +147,22 @@ public class TaskReceiver { } return tuple; + } catch (KryoException e) { + throw new RuntimeException(e); } catch (Throwable e) { if (taskStatus.isShutdown() == false) { - LOG.error( - idStr + " recv thread error " - + JStormUtils.toPrintableString(ser_msg) - + "\n", e); + LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e); } } finally { long end = System.nanoTime(); - deserializeTimer.update((end - start)/1000000.0d); + deserializeTimer.update((end - start) / TimeUtils.NS_PER_US); } return null; } @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { Object tuple = deserialize((byte[]) event); if (tuple != null) { @@ -189,7 +172,7 @@ public class TaskReceiver { @Override public void preRun() { - WorkerClassLoader.switchThreadContext(); + WorkerClassLoader.switchThreadContext(); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java index c49e9fc..d685c04 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java @@ -17,36 +17,31 @@ */ package com.alibaba.jstorm.task; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.spout.ISpout; import backtype.storm.task.IBolt; -import backtype.storm.topology.IConfig; +import backtype.storm.topology.IDynamicComponent; import backtype.storm.utils.WorkerClassLoader; - import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.daemon.worker.ShutdownableDameon; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable; import com.alibaba.jstorm.utils.JStormUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; /** * shutdown one task * * @author yannian/Longda - * */ public class TaskShutdownDameon implements ShutdownableDameon { - private static Logger LOG = LoggerFactory - .getLogger(TaskShutdownDameon.class); + private static Logger LOG = LoggerFactory.getLogger(TaskShutdownDameon.class); public static final byte QUIT_MSG = (byte) 0xff; + private Task task; private TaskStatus taskStatus; private String topology_id; private Integer task_id; @@ -55,16 +50,15 @@ public class TaskShutdownDameon implements ShutdownableDameon { private Object task_obj; private boolean isClosed = false; - public TaskShutdownDameon(TaskStatus taskStatus, String topology_id, - Integer task_id, List<AsyncLoopThread> all_threads, - StormClusterState zkCluster, Object task_obj) { + public TaskShutdownDameon(TaskStatus taskStatus, String topology_id, Integer task_id, List<AsyncLoopThread> all_threads, StormClusterState zkCluster, + Object task_obj, Task task) { this.taskStatus = taskStatus; this.topology_id = topology_id; this.task_id = task_id; this.all_threads = all_threads; this.zkCluster = zkCluster; this.task_obj = task_obj; - + this.task = task; } @Override @@ -104,18 +98,9 @@ public class TaskShutdownDameon implements ShutdownableDameon { closeComponent(task_obj); try { - JStormMetrics.unregisterTask(task_id); - TaskHeartbeatRunable.unregisterTaskStats(task_id); - zkCluster.remove_task_heartbeat(topology_id, task_id); + zkCluster.disconnect(); } catch (Exception e) { - // TODO Auto-generated catch block - LOG.info("Failed to cleanup"); - } finally { - try { - zkCluster.disconnect(); - } catch (Exception e) { - LOG.info("Failed to disconnect", e); - } + LOG.error("Failed to disconnect zk for task-" + task_id); } LOG.info("Successfully shutdown task " + topology_id + ":" + task_id); @@ -170,19 +155,22 @@ public class TaskShutdownDameon implements ShutdownableDameon { } } - public void updateConf(Map conf) { - if (task_obj instanceof IConfig) { - ((IConfig) task_obj).updateConf(conf); + public void update(Map conf) { + if (task_obj instanceof IDynamicComponent) { + ((IDynamicComponent) task_obj).update(conf); } } @Override public void run() { - // TODO Auto-generated method stub shutdown(); } public int getTaskId() { return this.task_id; } + + public Task getTask() { + return this.task; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java index efe6dee..4da4330 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java @@ -24,40 +24,44 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.messaging.IConnection; -import backtype.storm.messaging.TaskMessage; -import backtype.storm.scheduler.WorkerSlot; -import backtype.storm.serialization.KryoTupleSerializer; -import backtype.storm.tuple.TupleExt; -import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.Utils; -import backtype.storm.utils.WorkerClassLoader; - import com.alibaba.jstorm.callback.AsyncLoopRunnable; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; -import com.alibaba.jstorm.common.metric.MetricRegistry; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.common.metric.QueueGauge; -import com.alibaba.jstorm.common.metric.Timer; import com.alibaba.jstorm.daemon.worker.WorkerData; import com.alibaba.jstorm.metric.JStormHealthCheck; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.task.backpressure.BackpressureController; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import backtype.storm.Config; +import backtype.storm.messaging.IConnection; +import backtype.storm.messaging.TaskMessage; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.serialization.KryoTupleSerializer; +import backtype.storm.tuple.ITupleExt; +import backtype.storm.tuple.TupleExt; +import backtype.storm.utils.DisruptorQueue; +import backtype.storm.utils.Utils; +import backtype.storm.utils.WorkerClassLoader; + /** * Sending entrance - * + * <p/> * Task sending all tuples through this Object - * + * <p/> * Serialize the Tuple and put the serialized data to the sending queue * * @author yannian - * */ public class TaskTransfer { @@ -71,15 +75,18 @@ public class TaskTransfer { protected final AsyncLoopThread serializeThread; protected volatile TaskStatus taskStatus; protected String taskName; - protected Timer timer; + protected AsmHistogram serializeTimer; protected Task task; - + protected String topolgyId; + protected String componentId; + protected int taskId; + protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket; protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; - public TaskTransfer(Task task, String taskName, - KryoTupleSerializer serializer, TaskStatus taskStatus, - WorkerData workerData) { + protected BackpressureController backpressureController; + + public TaskTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) { this.task = task; this.taskName = taskName; this.serializer = serializer; @@ -87,49 +94,53 @@ public class TaskTransfer { this.storm_conf = workerData.getStormConf(); this.transferQueue = workerData.getTransferQueue(); this.innerTaskTransfer = workerData.getInnerTaskTransfer(); - + this.nodeportSocket = workerData.getNodeportSocket(); this.taskNodeport = workerData.getTaskNodeport(); - int queue_size = - Utils.getInt(storm_conf - .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE)); - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf); - this.serializeQueue = - DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, - queue_size, waitStrategy); + this.topolgyId = workerData.getTopologyId(); + this.componentId = this.task.getComponentId(); + this.taskId = this.task.getTaskId(); + + int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE)); + WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf); + this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, queue_size, waitStrategy); this.serializeQueue.consumerStarted(); String taskId = taskName.substring(taskName.indexOf(":") + 1); - String metricName = - MetricRegistry.name(MetricDef.SERIALIZE_QUEUE, taskName); - QueueGauge serializeQueueGauge = - new QueueGauge(metricName, serializeQueue); - JStormMetrics.registerTaskGauge(serializeQueueGauge, - Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE); - JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId), - MetricDef.SERIALIZE_QUEUE, serializeQueueGauge); - timer = - JStormMetrics.registerTaskTimer(Integer.valueOf(taskId), - MetricDef.SERIALIZE_TIME); + QueueGauge serializeQueueGauge = new QueueGauge(serializeQueue, taskName, MetricDef.SERIALIZE_QUEUE); + JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_QUEUE, MetricType.GAUGE), + new AsmGauge(serializeQueueGauge)); + JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE, serializeQueueGauge); + serializeTimer = + (AsmHistogram) JStormMetrics.registerTaskMetric( + MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram()); serializeThread = setupSerializeThread(); + + backpressureController = new BackpressureController(storm_conf, task.getTaskId(), serializeQueue, queue_size); LOG.info("Successfully start TaskTransfer thread"); } public void transfer(TupleExt tuple) { - int taskid = tuple.getTargetTaskId(); + int taskId = tuple.getTargetTaskId(); - DisruptorQueue exeQueue = innerTaskTransfer.get(taskid); + DisruptorQueue exeQueue = innerTaskTransfer.get(taskId); if (exeQueue != null) { exeQueue.publish(tuple); } else { - serializeQueue.publish(tuple); + push(taskId, tuple); } + if (backpressureController.isBackpressureMode()) { + backpressureController.flowControl(); + } + } + + public void push(int taskId, TupleExt tuple) { + serializeQueue.publish(tuple); } protected AsyncLoopThread setupSerializeThread() { @@ -140,6 +151,10 @@ public class TaskTransfer { return serializeThread; } + public BackpressureController getBackpressureController() { + return backpressureController; + } + protected class TransferRunnable extends RunnableCallback implements EventHandler { private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown(); @@ -148,6 +163,7 @@ public class TaskTransfer { public String getThreadName() { return taskName + "-" + TransferRunnable.class.getSimpleName(); } + @Override public void preRun() { @@ -156,61 +172,80 @@ public class TaskTransfer { @Override public void run() { - while (shutdown.get() == false) { serializeQueue.consumeBatchWhenAvailable(this); - } - } @Override public void postRun() { WorkerClassLoader.restoreThreadContext(); } + + public byte[] serialize(ITupleExt tuple) { + return serializer.serialize((TupleExt)tuple); + } @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } - long start = System.currentTimeMillis(); + long start = System.nanoTime(); try { - TupleExt tuple = (TupleExt) event; + + ITupleExt tuple = (ITupleExt) event; int taskid = tuple.getTargetTaskId(); - byte[] tupleMessage = serializer.serialize(tuple); - TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage); IConnection conn = getConnection(taskid); if (conn != null) { + byte[] tupleMessage = serialize(tuple); + TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage); conn.send(taskMessage); } } finally { - long end = System.currentTimeMillis(); - timer.update(end - start); + long end = System.nanoTime(); + serializeTimer.update((end - start)/TimeUtils.NS_PER_US); } } - + protected IConnection getConnection(int taskId) { IConnection conn = null; WorkerSlot nodePort = taskNodeport.get(taskId); if (nodePort == null) { - String errormsg = "can`t not found IConnection to " + taskId; - LOG.warn("Intra transfer warn", new Exception(errormsg)); + String errormsg = "IConnection to " + taskId + " can't be found"; + LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg)); } else { conn = nodeportSocket.get(nodePort); if (conn == null) { - String errormsg = "can`t not found nodePort " + nodePort; - LOG.warn("Intra transfer warn", new Exception(errormsg)); + String errormsg = "NodePort to" + nodePort + " can't be found"; + LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg)); } } return conn; } + protected void pullTuples(Object event) { + TupleExt tuple = (TupleExt) event; + int taskid = tuple.getTargetTaskId(); + IConnection conn = getConnection(taskid); + if (conn != null) { + while (conn.available() == false) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + + } + } + byte[] tupleMessage = serializer.serialize(tuple); + TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage); + conn.send(taskMessage); + } + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java index 596fa35..f703f25 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java @@ -17,12 +17,12 @@ */ package com.alibaba.jstorm.task; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; +import backtype.storm.generated.TaskHeartbeat; + import com.alibaba.jstorm.utils.TimeUtils; /** - * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid, - * Map<tkHbCacheTime, time>>>) + * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid, Map<tkHbCacheTime, time>>>) */ public class TkHbCacheTime { @@ -54,12 +54,13 @@ public class TkHbCacheTime { this.taskAssignedTime = taskAssignedTime; } - public void update(TaskHeartbeat zkTaskHeartbeat) { - int nowSecs = TimeUtils.current_time_secs(); - this.nimbusTime = nowSecs; - this.taskReportedTime = zkTaskHeartbeat.getTimeSecs(); - this.taskAssignedTime = - zkTaskHeartbeat.getTimeSecs() - zkTaskHeartbeat.getUptimeSecs(); + public void update(TaskHeartbeat taskHeartbeat) { + if (taskHeartbeat != null) { + int nowSecs = TimeUtils.current_time_secs(); + this.nimbusTime = nowSecs; + this.taskReportedTime = taskHeartbeat.get_time(); + this.taskAssignedTime = taskHeartbeat.get_time() - taskHeartbeat.get_uptime(); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java index 2be1592..4deb8ed 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java @@ -17,25 +17,21 @@ */ package com.alibaba.jstorm.task.acker; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.task.IBolt; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; - import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.RotatingMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; /** - * * @author yannian/Longda - * */ public class Acker implements IBolt { @@ -57,27 +53,19 @@ public class Acker implements IBolt { private long rotateTime; @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; // pending = new TimeCacheMap<Object, AckObject>(timeoutSec, // TIMEOUT_BUCKET_NUM); this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM); - this.rotateTime = - 1000L - * JStormUtils.parseInt(stormConf - .get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30) - / (TIMEOUT_BUCKET_NUM - 1); + this.rotateTime = 1000L * JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30) / (TIMEOUT_BUCKET_NUM - 1); } @Override public void execute(Tuple input) { Object id = input.getValue(0); - AckObject curr = pending.get(id); - String stream_id = input.getSourceStreamId(); - if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) { if (curr == null) { curr = new AckObject(); @@ -95,17 +83,13 @@ public class Acker implements IBolt { } else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) { if (curr != null) { curr.update_ack(input.getValue(1)); - } else { // two case // one is timeout // the other is bolt's ack first come curr = new AckObject(); - - curr.val = Long.valueOf(input.getLong(1)); - + curr.val = input.getLong(1); pending.put(id, curr); - } } else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) { if (curr == null) { @@ -113,31 +97,23 @@ public class Acker implements IBolt { // already timeout, should go fail return; } - curr.failed = true; - } else { LOG.info("Unknow source stream"); return; } Integer task = curr.spout_task; - if (task != null) { - if (curr.val == 0) { pending.remove(id); List values = JStormUtils.mk_list(id); - collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values); - } else { - if (curr.failed) { pending.remove(id); List values = JStormUtils.mk_list(id); - collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, - values); + collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values); } } } else { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java new file mode 100644 index 0000000..528fc6b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import java.util.Map; + +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.utils.JStormUtils; + +public abstract class Backpressure { + private static final String BACKPRESSURE_DELAY_TIME = "topology.backpressure.delay.time"; + + protected volatile boolean isBackpressureEnable; + + protected volatile double highWaterMark; + protected volatile double lowWaterMark; + + protected volatile double triggerBpRatio; + + protected volatile long sleepTime; + + public Backpressure(Map stormConf) { + this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf); + this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf); + this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf); + this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf); + } + + protected void updateConfig(Map stormConf) { + if (stormConf == null) { + return; + } + + if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE)) { + this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf); + } + + if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH)) { + this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf); + } + + if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW)) { + this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf); + } + + if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO)) { + this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf); + } + + if (stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) { + long time = JStormUtils.parseLong(stormConf, 0l); + if (time != 0l) { + this.sleepTime = time; + } + } + } + + public boolean isBackpressureConfigChange(Map stormConf) { + if (stormConf == null) { + return false; + } + + if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE) || + stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH) || + stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW) || + stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO) || + stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) { + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java new file mode 100644 index 0000000..82dd938 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import java.util.List; +import java.util.Map; + +import com.alibaba.jstorm.utils.JStormUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.task.TaskTransfer; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.tuple.Values; +import backtype.storm.utils.DisruptorQueue; + +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType; + +/** + * Flow Control + * + * @author Basti Liu + */ +public class BackpressureController extends Backpressure { + private static Logger LOG = LoggerFactory.getLogger(BackpressureController.class); + + private int taskId; + private DisruptorQueue queueControlled; + private int totalQueueSize; + private int queueSizeReduced; + + private boolean isBackpressureMode = false; + + private SpoutOutputCollector outputCollector; + + private long maxBound, minBound; + + public BackpressureController(Map conf, int taskId, DisruptorQueue queue, int queueSize) { + super(conf); + this.queueControlled = queue; + this.totalQueueSize = queueSize; + this.queueSizeReduced = queueSize; + this.taskId = taskId; + this.maxBound = 0l; + this.minBound = 0l; + } + + public void setOutputCollector(SpoutOutputCollector outputCollector) { + this.outputCollector = outputCollector; + } + + public void control(TopoMasterCtrlEvent ctrlEvent) { + if (isBackpressureEnable == false) { + return; + } + + EventType eventType = ctrlEvent.getEventType(); + LOG.debug("Received control event, " + eventType.toString()); + if (eventType.equals(EventType.startBackpressure)) { + List<Object> value = ctrlEvent.getEventValue(); + int flowCtrlTime = value.get(0) != null ? (Integer) value.get(0) : 0; + start(flowCtrlTime); + } else if (eventType.equals(EventType.stopBackpressure)) { + stop(); + } else if (eventType.equals(EventType.updateBackpressureConfig)) { + List<Object> value = ctrlEvent.getEventValue(); + if (value != null) { + Map stormConf = (Map) value.get(0); + updateConfig(stormConf); + + if (isBackpressureEnable == false) { + LOG.info("Disable backpressure in controller."); + resetBackpressureInfo(); + } else { + LOG.info("Enable backpressure in controller"); + } + } + } + } + + public void flowControl() { + if (isBackpressureEnable == false) { + return; + } + + try { + Thread.sleep(sleepTime); + while (isQueueCapacityAvailable() == false) { + Thread.sleep(1); + } + } catch (InterruptedException e) { + LOG.error("Sleep was interrupted!"); + } + } + + private void resetBackpressureInfo() { + sleepTime = 0l; + maxBound = 0l; + minBound = 0l; + queueSizeReduced = totalQueueSize; + isBackpressureMode = false; + } + + private void start(int flowCtrlTime) { + if (flowCtrlTime > 0) { + if (maxBound < flowCtrlTime) { + sleepTime = flowCtrlTime; + } else if (maxBound == flowCtrlTime) { + if (sleepTime >= maxBound) { + sleepTime++; + } else { + sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true); + } + } else { + if (maxBound <= sleepTime) { + sleepTime++; + } else { + if (sleepTime >= flowCtrlTime) { + sleepTime = JStormUtils.halfValueOfSum(maxBound, sleepTime, true); + } else { + sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true); + } + } + } + } else { + sleepTime++; + } + if (sleepTime > maxBound) { + maxBound = sleepTime; + } + + int size = totalQueueSize / 100; + queueSizeReduced = size > 10 ? size : 10; + isBackpressureMode = true; + + LOG.info("Start backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced, flowCtrlTime); + } + + private void stop() { + if (sleepTime == minBound) { + minBound = 0; + } + sleepTime = JStormUtils.halfValueOfSum(minBound, sleepTime, false); + + if (sleepTime == 0) { + resetBackpressureInfo(); + + TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null); + outputCollector.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp)); + } else { + minBound = sleepTime; + } + + LOG.info("Stop backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced); + } + + public boolean isBackpressureMode() { + return isBackpressureMode & isBackpressureEnable; + } + + public boolean isQueueCapacityAvailable() { + return (queueControlled.population() < queueSizeReduced); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java new file mode 100644 index 0000000..d270078 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import backtype.storm.generated.*; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.cluster.*; +import com.alibaba.jstorm.task.acker.Acker; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; + +/** + * Coordinator is responsible for the request from trigger and controller. + * - Event from trigger: + * Find relative controllers (source spouts), and decide if it is required to send out the request. + * - Event from controller: + * If backpressure stop event, send stop request to all target triggers. + * + * @author Basti Li + */ +public class BackpressureCoordinator extends Backpressure { + private static final Logger LOG = LoggerFactory.getLogger(BackpressureCoordinator.class); + + private static final int adjustedTime = 5; + + private TopologyContext context; + private StormTopology topology; + private OutputCollector output; + + private int topologyMasterId; + private Map<Integer, String> taskIdToComponentId; + private Map<String, SpoutSpec> spouts; + private Map<String, Bolt> bolts; + + // Map<source componentId, Map<ComponentId, backpressure info>> + private Map<String, SourceBackpressureInfo> SourceTobackpressureInfo; + + private Integer period; + + private StormClusterState zkCluster; + private static final String BACKPRESSURE_TAG = "Backpressure has been "; + + + public BackpressureCoordinator(OutputCollector output, TopologyContext topologyContext, Integer taskId) { + super(topologyContext.getStormConf()); + this.context = topologyContext; + this.topology = topologyContext.getRawTopology(); + this.spouts = new HashMap<String, SpoutSpec>(); + if (this.topology.get_spouts() != null) { + this.spouts.putAll(this.topology.get_spouts()); + } + this.bolts = new HashMap<String, Bolt>(); + if (this.topology.get_bolts() != null) { + this.bolts.putAll(this.topology.get_bolts()); + } + this.taskIdToComponentId = topologyContext.getTaskToComponent(); + this.topologyMasterId = taskId; + + this.output = output; + + int checkInterval = ConfigExtension.getBackpressureCheckIntervl(context.getStormConf()); + int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(context.getStormConf()); + this.period = checkInterval * sampleNum; + + this.zkCluster = topologyContext.getZkCluster(); + try { + this.SourceTobackpressureInfo = zkCluster.get_backpressure_info(context.getTopologyId()); + if (this.SourceTobackpressureInfo == null) { + this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>(); + } else { + LOG.info("Successfully retrieve existing SourceTobackpressureInfo from zk: " + SourceTobackpressureInfo); + } + } catch (Exception e) { + LOG.warn("Failed to get SourceTobackpressureInfo from zk", e); + this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>(); + } + } + + private Set<String> getInputSpoutsForBolt(StormTopology topology, String boltComponentId, Set<String> componentsTraversed) { + Set<String> ret = new TreeSet<String>(); + + if (componentsTraversed == null) { + componentsTraversed = new HashSet<String>(); + } + + Bolt bolt = bolts.get(boltComponentId); + if (bolt == null) { + return ret; + } + + ComponentCommon common = bolt.get_common(); + Set<GlobalStreamId> inputstreams = common.get_inputs().keySet(); + Set<String> inputComponents = new TreeSet<String>(); + for (GlobalStreamId streamId : inputstreams) { + inputComponents.add(streamId.get_componentId()); + } + + Set<String> spoutComponentIds = new HashSet<String>(spouts.keySet()); + Set<String> boltComponentIds = new HashSet<String>(bolts.keySet()); + for (String inputComponent : inputComponents) { + // Skip the components which has been traversed before, to avoid dead loop when there are loop bolts in topology + if (componentsTraversed.contains(inputComponent)) { + continue; + } else { + componentsTraversed.add(inputComponent); + } + + if (spoutComponentIds.contains(inputComponent)) { + ret.add(inputComponent); + } else if (boltComponentIds.contains(inputComponent)) { + Set<String> inputs = getInputSpoutsForBolt(topology, inputComponent, componentsTraversed); + ret.addAll(inputs); + } + } + + return ret; + } + + public void process(Tuple input) { + if (isBackpressureEnable == false) { + return; + } + + int sourceTask = input.getSourceTask(); + String componentId = taskIdToComponentId.get(sourceTask); + if (componentId == null) { + LOG.warn("Receive tuple from unknown task-" + sourceTask); + return; + } + + if (spouts.keySet().contains(componentId)) { + if (SourceTobackpressureInfo.get(componentId) != null) { + handleEventFromSpout(sourceTask, input); + } + } else if (bolts.keySet().contains(componentId)) { + handleEventFromBolt(sourceTask, input); + } + } + + public void updateBackpressureConfig(Map conf) { + updateConfig(conf); + + if (isBackpressureEnable == false) { + LOG.info("Disable backpressure in coordinator."); + SourceTobackpressureInfo.clear(); + } else { + LOG.info("Enable backpressure in coordinator."); + } + + TopoMasterCtrlEvent updateBpConfig = new TopoMasterCtrlEvent(EventType.updateBackpressureConfig, new ArrayList<Object>()); + updateBpConfig.addEventValue(conf); + Values values = new Values(updateBpConfig); + Set<Integer> targetTasks = new TreeSet<Integer>(taskIdToComponentId.keySet()); + targetTasks.remove(topologyMasterId); + targetTasks.removeAll(context.getComponentTasks(Acker.ACKER_COMPONENT_ID)); + sendBackpressureMessage(targetTasks, values, EventType.updateBackpressureConfig); + + reportBackpressureStatus(); + } + + private boolean checkSpoutsUnderBackpressure(Set<String> spouts) { + boolean ret = false; + + if (spouts != null) { + for (String spout : spouts) { + SourceBackpressureInfo backpressureInfo = SourceTobackpressureInfo.get(spout); + if (backpressureInfo != null && backpressureInfo.getTasks().size() > 0) { + ret = true; + break; + } + } + } + + return ret; + } + + private TargetBackpressureInfo getBackpressureInfoBySourceSpout(String sourceSpout, String targetComponentId, boolean created) { + TargetBackpressureInfo ret = null; + + SourceBackpressureInfo info = SourceTobackpressureInfo.get(sourceSpout); + if (info == null) { + if (created) { + info = new SourceBackpressureInfo(); + SourceTobackpressureInfo.put(sourceSpout, info); + } + } else { + ret = info.getTargetTasks().get(targetComponentId); + } + + if (ret == null && created) { + ret = new TargetBackpressureInfo(); + info.getTargetTasks().put(targetComponentId, ret); + } + return ret; + } + + private boolean checkIntervalExpired(long time) { + boolean ret = false; + if (time != 0) { + if (System.currentTimeMillis() - time > period) { + ret = true; + } + } + return ret; + } + + private void sendBackpressureMessage(Set<Integer> targetTasks, Values value, EventType backpressureType) { + for (Integer taskId : targetTasks) { + output.emitDirect(taskId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, value); + LOG.debug("Send " + backpressureType.toString() + " request to taskId-" + taskId); + } + } + + private void handleEventFromSpout(int sourceTask, Tuple input) { + TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent"); + EventType type = ctrlEvent.getEventType(); + + boolean update = false; + if (type.equals(EventType.stopBackpressure)) { + String spoutComponentId = taskIdToComponentId.get(sourceTask); + SourceBackpressureInfo info = SourceTobackpressureInfo.remove(spoutComponentId); + if (info != null) { + info.getTasks().remove(sourceTask); + if (info.getTasks().size() == 0) { + for (Entry<String, TargetBackpressureInfo> entry : info.getTargetTasks().entrySet()) { + String componentId = entry.getKey(); + + // Make sure if all source spouts for this bolt are NOT under backpressure mode. + Set<String> sourceSpouts = getInputSpoutsForBolt(topology, componentId, null); + if (checkSpoutsUnderBackpressure(sourceSpouts) == false) { + Set<Integer> tasks = new TreeSet<Integer>(); + tasks.addAll(context.getComponentTasks(componentId)); + sendBackpressureMessage(tasks, new Values(ctrlEvent), type); + } + } + } + update = true; + } else { + LOG.error("Received event from non-recorded spout-" + sourceTask); + } + + } else { + LOG.warn("Received unexpected event, " + type.toString()); + } + + // If task set under backpressure has been changed, report the latest status + if (update) { + reportBackpressureStatus(); + } + } + + private void handleEventFromBolt(int sourceTask, Tuple input) { + String componentId = taskIdToComponentId.get(sourceTask); + Set<String> inputSpouts = getInputSpoutsForBolt(topology, componentId, null); + + TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent"); + EventType type = ctrlEvent.getEventType(); + Set<Integer> notifyList = new TreeSet<Integer>(); + Values values = null; + TargetBackpressureInfo info = null; + boolean update = false; + if (type.equals(EventType.startBackpressure)) { + int flowCtrlTime = (Integer) ctrlEvent.getEventValue().get(0); + for (String spout : inputSpouts) { + info = getBackpressureInfoBySourceSpout(spout, componentId, true); + SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout); + update = info.getTasks().add(sourceTask); + boolean add = false; + if (System.currentTimeMillis() - sourceInfo.getLastestTimeStamp() > period) { + add = true; + } else { + EventType lastestBpEvent = sourceInfo.getLastestBackpressureEvent(); + if (lastestBpEvent != null && lastestBpEvent.equals(EventType.startBackpressure) == false) { + add = true; + } + + int maxFlowCtrlTime = sourceInfo.getMaxFlowCtrlTime(); + if ((flowCtrlTime - maxFlowCtrlTime > adjustedTime || maxFlowCtrlTime == -1) && + flowCtrlTime >= 0) { + add = true; + } + } + info.setFlowCtrlTime(flowCtrlTime); + info.setBackpressureStatus(type); + + if (add) { + info.setTimeStamp(System.currentTimeMillis()); + // Only when the number of bolt tasks sending request is more than a configured number, coordinator will + // send out backpressure request to controller. It is used to avoid the problem that very few tasks might + // cause the over control. + double taskBpRatio = Double.valueOf(info.getTasks().size()) / Double.valueOf(context.getComponentTasks(componentId).size()) ; + if (taskBpRatio >= triggerBpRatio) { + Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout)); + if (spoutTasks != null) { + SourceTobackpressureInfo.get(spout).getTasks().addAll(spoutTasks); + notifyList.addAll(spoutTasks); + } + } else { + update = false; + } + } else { + update = false; + } + } + + List<Object> value = new ArrayList<Object>(); + value.add(info.getFlowCtrlTime()); + TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value); + values = new Values(startBp); + } else if (type.equals(EventType.stopBackpressure)) { + for (String spout : inputSpouts) { + info = getBackpressureInfoBySourceSpout(spout, componentId, false); + SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout); + if (info != null) { + Set<Integer> tasks = info.getTasks(); + if (tasks != null) { + if(tasks.remove(sourceTask)) { + update = true; + } + } + } + + if (sourceInfo != null && checkIntervalExpired(sourceInfo.getLastestTimeStamp())) { + info.setTimeStamp(System.currentTimeMillis()); + Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout)); + if (spoutTasks != null) { + notifyList.addAll(spoutTasks); + } + info.setBackpressureStatus(type); + } + } + + // Check if all source spouts are Not under backpressure. If so, notify the bolt. + if (checkSpoutsUnderBackpressure(inputSpouts) == false) { + notifyList.add(sourceTask); + } + + TopoMasterCtrlEvent stoptBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null); + values = new Values(stoptBp); + } else { + LOG.warn("Received unknown event " + type.toString()); + } + + sendBackpressureMessage(notifyList, values, type); + + // If task set under backpressure has been changed, report the latest status + if (update) { + LOG.info("inputspouts=" + inputSpouts + " for " + componentId + "-" + sourceTask + ", eventType=" + type.toString()); + reportBackpressureStatus(); + } + } + + private Set<Integer> getTasksUnderBackpressure() { + Set<Integer> ret = new HashSet<Integer>(); + + for (Entry<String, SourceBackpressureInfo> entry : SourceTobackpressureInfo.entrySet()) { + SourceBackpressureInfo sourceInfo = entry.getValue(); + if (sourceInfo.getTasks().size() > 0) { + ret.addAll(sourceInfo.getTasks()); + + for (Entry<String, TargetBackpressureInfo> targetEntry: sourceInfo.getTargetTasks().entrySet()) { + ret.addAll(targetEntry.getValue().getTasks()); + } + + } + } + + return ret; + } + + private void reportBackpressureStatus() { + try { + StringBuilder stringBuilder = new StringBuilder(); + Set<Integer> underTasks = getTasksUnderBackpressure(); + stringBuilder.append(BACKPRESSURE_TAG); + if (underTasks.isEmpty()){ + stringBuilder.append("closed "); + }else { + stringBuilder.append("opened: "); + stringBuilder.append(underTasks); + } + zkCluster.report_task_error(context.getTopologyId(), context.getThisTaskId(), stringBuilder.toString(), BACKPRESSURE_TAG); + zkCluster.set_backpressure_info(context.getTopologyId(), SourceTobackpressureInfo); + LOG.info(stringBuilder.toString()); + } catch (Exception e) { + LOG.error("can't update backpressure state ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java new file mode 100644 index 0000000..0f2df95 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.DisruptorQueue; + +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.task.Task; +import com.alibaba.jstorm.task.execute.BoltExecutors; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType; +import com.alibaba.jstorm.utils.IntervalCheck; + +/** + * Responsible for checking if back pressure shall be triggered. + * When heavy load (the size of the queue monitored reaches high water mark), start back pressure, + * and when load goes down, stop back pressure. + * + * @author Basti Liu + */ +public class BackpressureTrigger extends Backpressure { + private static final Logger LOG = LoggerFactory.getLogger(BackpressureTrigger.class); + + private Task task; + private int taskId; + + // Queue which is going to be monitored + private DisruptorQueue exeQueue; + private DisruptorQueue recvQueue; + + private BoltExecutors boltExecutor; + + private volatile boolean isUnderBackpressure = false; + + private IntervalCheck intervalCheck; + + OutputCollector output; + + private List<EventType> samplingSet; + private double triggerSampleRate; + + public BackpressureTrigger(Task task, BoltExecutors boltExecutor, Map stormConf, OutputCollector output) { + super(stormConf); + + this.task = task; + this.taskId = task.getTaskId(); + + int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(stormConf); + int smapleInterval = sampleNum * (ConfigExtension.getBackpressureCheckIntervl(stormConf)); + this.intervalCheck = new IntervalCheck(); + this.intervalCheck.setIntervalMs(smapleInterval); + this.intervalCheck.start(); + + this.samplingSet = new ArrayList<EventType>(); + this.triggerSampleRate = ConfigExtension.getBackpressureTriggerSampleRate(stormConf); + + this.output = output; + + this.boltExecutor = boltExecutor; + + try { + StormClusterState zkCluster = task.getZkCluster(); + Map<String, SourceBackpressureInfo> backpressureInfo = zkCluster.get_backpressure_info(task.getTopologyId()); + if (backpressureInfo != null) { + for (Entry<String, SourceBackpressureInfo> entry : backpressureInfo.entrySet()) { + SourceBackpressureInfo info = entry.getValue(); + Map<String, TargetBackpressureInfo> targetInfoMap = info.getTargetTasks(); + if (targetInfoMap != null) { + TargetBackpressureInfo targetInfo = targetInfoMap.get(task.getComponentId()); + if (targetInfo != null && targetInfo.getTasks().contains(taskId)) { + isBackpressureEnable = true; + LOG.info("Retrieved backpressure info for task-" + taskId); + } + } + } + } + } catch (Exception e) { + LOG.info("Failed to get backpressure info from zk", e); + } + LOG.info("Finished BackpressureTrigger init, highWaterMark=" + highWaterMark + ", lowWaterMark=" + lowWaterMark + ", sendInterval=" + + intervalCheck.getInterval()); + } + + public void checkAndTrigger() { + if (isBackpressureEnable == false) { + return; + } + + if (exeQueue == null || recvQueue == null) { + exeQueue = task.getExecuteQueue(); + recvQueue = task.getDeserializeQueue(); + + if (exeQueue == null) { + LOG.info("Init of excutor-task-" + taskId + " has not been finished!"); + return; + } + if (recvQueue == null) { + LOG.info("Init of receiver-task-" + taskId + " has not been finished!"); + return; + } + } + + LOG.debug("Backpressure Check: exeQueue load=" + (exeQueue.pctFull() * 100) + ", recvQueue load=" + (recvQueue.pctFull() * 100)); + if (exeQueue.pctFull() > highWaterMark) { + samplingSet.add(EventType.startBackpressure); + } else if (exeQueue.pctFull() <= lowWaterMark) { + samplingSet.add(EventType.stopBackpressure); + } else { + samplingSet.add(EventType.defaultType); + } + + if (intervalCheck.check()) { + int startCount = 0, stopCount = 0; + + for (EventType eventType : samplingSet) { + if (eventType.equals(EventType.startBackpressure)) { + startCount++; + } else if (eventType.equals(EventType.stopBackpressure)) { + stopCount++; + } + } + + if (startCount > stopCount) { + if (sampleRateCheck(startCount)) { + startBackpressure(); + isUnderBackpressure = true; + } + } else { + if (sampleRateCheck(stopCount) && isUnderBackpressure == true) { + stopBackpressure(); + } + } + + samplingSet.clear(); + } + } + + private boolean sampleRateCheck(double count) { + double sampleRate = count / samplingSet.size(); + if (sampleRate > triggerSampleRate) + return true; + else + return false; + } + + public void handle(Tuple input) { + try { + TopoMasterCtrlEvent event = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent"); + EventType type = event.getEventType(); + if (type.equals(EventType.stopBackpressure)) { + isUnderBackpressure = false; + LOG.info("Received stop backpressure event for task-" + task.getTaskId()); + } else if (type.equals(EventType.updateBackpressureConfig)) { + Map stormConf = (Map) event.getEventValue().get(0); + updateConfig(stormConf); + + if (isBackpressureEnable == false) { + LOG.info("Disable backpressure in trigger."); + isUnderBackpressure = false; + samplingSet.clear(); + } else { + LOG.info("Enable backpressure in trigger."); + } + } else { + LOG.info("Received unexpected event, " + type.toString()); + } + } catch (Exception e) { + LOG.error("Failed to handle event", e); + } + } + + private void startBackpressure() { + List<Object> value = new ArrayList<Object>(); + Double flowCtrlTime = Double.valueOf(boltExecutor.getExecuteTime() / 1000); + value.add(flowCtrlTime.intValue()); + TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value); + output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(startBp)); + LOG.debug("Send start backpressure request for task-{}, flowCtrlTime={}", taskId, flowCtrlTime.intValue()); + } + + private void stopBackpressure() { + TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null); + output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp)); + LOG.debug("Send stop backpressure request for task-{}", taskId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java new file mode 100644 index 0000000..05f7d11 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType; + +public class SourceBackpressureInfo implements Serializable { + private static final long serialVersionUID = -8213491092461721871L; + + // source tasks under backpressure + private Set<Integer> tasks; + + // target tasks which has sent request to source task + // Map<componentId, source task backpressure info> + private Map<String, TargetBackpressureInfo> targetTasks; + + public SourceBackpressureInfo() { + this.tasks = new TreeSet<Integer>(); + this.targetTasks = new HashMap<String, TargetBackpressureInfo>(); + } + + public Set<Integer> getTasks() { + return tasks; + } + + public Map<String, TargetBackpressureInfo> getTargetTasks() { + return targetTasks; + } + + public long getLastestTimeStamp() { + long ret = 0; + + for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) { + TargetBackpressureInfo info = entry.getValue(); + if (info.getTimeStamp() > ret) { + ret = info.getTimeStamp(); + } + } + return ret; + } + + public EventType getLastestBackpressureEvent() { + EventType ret = null; + long timeStamp = 0; + + for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) { + TargetBackpressureInfo info = entry.getValue(); + if (info.getTimeStamp() > timeStamp) { + timeStamp = info.getTimeStamp(); + ret = info.getBackpressureStatus(); + } + } + + return ret; + } + + public int getMaxFlowCtrlTime() { + int ret = 0; + + for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) { + TargetBackpressureInfo info = entry.getValue(); + if (info.getFlowCtrlTime() > ret) { + ret = info.getFlowCtrlTime(); + } + } + return ret; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java new file mode 100644 index 0000000..2f6332b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.task.backpressure; + +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType; + +public class TargetBackpressureInfo implements Serializable { + private static final long serialVersionUID = -1829897435773792484L; + + private Set<Integer> tasks; + + private EventType backpressureStatus; + private int flowCtrlTime; + private long timeStamp; + + public TargetBackpressureInfo() { + this.tasks = new TreeSet<Integer>(); + this.backpressureStatus = EventType.defaultType; + this.flowCtrlTime = -1; + this.timeStamp = 0l; + } + + public TargetBackpressureInfo(EventType backpressureStatus, int flowCtrlTime, long time) { + this.tasks = new TreeSet<Integer>(); + this.backpressureStatus = backpressureStatus; + this.flowCtrlTime = flowCtrlTime; + this.timeStamp = time; + } + + public Set<Integer> getTasks() { + return tasks; + } + + public void setBackpressureStatus(EventType status) { + this.backpressureStatus = status; + } + + public EventType getBackpressureStatus() { + return this.backpressureStatus; + } + + public void setTimeStamp(long time) { + this.timeStamp = time; + } + + public long getTimeStamp() { + return this.timeStamp; + } + + public int getFlowCtrlTime() { + return this.flowCtrlTime; + } + + public void setFlowCtrlTime(int time) { + this.flowCtrlTime = time; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +}
