http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISpout.java b/jstorm-core/src/main/java/backtype/storm/spout/ISpout.java index c421fed..ae86448 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/ISpout.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/ISpout.java @@ -22,83 +22,83 @@ import java.util.Map; import java.io.Serializable; /** - * ISpout is the core interface for implementing spouts. A Spout is responsible - * for feeding messages into the topology for processing. For every tuple emitted by - * a spout, Storm will track the (potentially very large) DAG of tuples generated - * based on a tuple emitted by the spout. When Storm detects that every tuple in - * that DAG has been successfully processed, it will send an ack message to the Spout. - * - * <p>If a tuple fails to be fully processed within the configured timeout for the - * topology (see {@link backtype.storm.Config}), Storm will send a fail message to the spout - * for the message.</p> - * - * <p> When a Spout emits a tuple, it can tag the tuple with a message id. The message id - * can be any type. When Storm acks or fails a message, it will pass back to the - * spout the same message id to identify which tuple it's referring to. If the spout leaves out - * the message id, or sets it to null, then Storm will not track the message and the spout - * will not receive any ack or fail callbacks for the message.</p> - * - * <p>Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor - * of an ISpout does not need to worry about concurrency issues between those methods. However, it - * also means that an implementor must ensure that nextTuple is non-blocking: otherwise - * the method could block acks and fails that are pending to be processed.</p> + * ISpout is the core interface for implementing spouts. A Spout is responsible for feeding messages into the topology for processing. For every tuple emitted + * by a spout, Storm will track the (potentially very large) DAG of tuples generated based on a tuple emitted by the spout. When Storm detects that every tuple + * in that DAG has been successfully processed, it will send an ack message to the Spout. + * + * <p> + * If a tuple fails to be fully processed within the configured timeout for the topology (see {@link backtype.storm.Config}), Storm will send a fail message to + * the spout for the message. + * </p> + * + * <p> + * When a Spout emits a tuple, it can tag the tuple with a message id. The message id can be any type. When Storm acks or fails a message, it will pass back to + * the spout the same message id to identify which tuple it's referring to. If the spout leaves out the message id, or sets it to null, then Storm will not + * track the message and the spout will not receive any ack or fail callbacks for the message. + * </p> + * + * <p> + * Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor of an ISpout does not need to worry about concurrency issues + * between those methods. However, it also means that an implementor must ensure that nextTuple is non-blocking: otherwise the method could block acks and fails + * that are pending to be processed. + * </p> */ public interface ISpout extends Serializable { /** - * Called when a task for this component is initialized within a worker on the cluster. - * It provides the spout with the environment in which the spout executes. - * - * <p>This includes the:</p> - * - * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. - * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. - * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. + * Called when a task for this component is initialized within a worker on the cluster. It provides the spout with the environment in which the spout + * executes. + * + * <p> + * This includes the: + * </p> + * + * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this + * machine. + * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this + * task, input and output information, etc. + * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The + * collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** - * Called when an ISpout is going to be shutdown. There is no guarentee that close - * will be called, because the supervisor kill -9's worker processes on the cluster. - * - * <p>The one context where close is guaranteed to be called is a topology is - * killed when running Storm in local mode.</p> + * Called when an ISpout is going to be shutdown. There is no guarentee that close will be called, because the supervisor kill -9's worker processes on the + * cluster. + * + * <p> + * The one context where close is guaranteed to be called is a topology is killed when running Storm in local mode. + * </p> */ void close(); - + /** - * Called when a spout has been activated out of a deactivated mode. - * nextTuple will be called on this spout soon. A spout can become activated - * after having been deactivated when the topology is manipulated using the - * `storm` client. + * Called when a spout has been activated out of a deactivated mode. nextTuple will be called on this spout soon. A spout can become activated after having + * been deactivated when the topology is manipulated using the `storm` client. */ void activate(); - + /** - * Called when a spout has been deactivated. nextTuple will not be called while - * a spout is deactivated. The spout may or may not be reactivated in the future. + * Called when a spout has been deactivated. nextTuple will not be called while a spout is deactivated. The spout may or may not be reactivated in the + * future. */ void deactivate(); /** - * When this method is called, Storm is requesting that the Spout emit tuples to the - * output collector. This method should be non-blocking, so if the Spout has no tuples - * to emit, this method should return. nextTuple, ack, and fail are all called in a tight - * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous - * to have nextTuple sleep for a short amount of time (like a single millisecond) - * so as not to waste too much CPU. + * When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be non-blocking, so if the Spout + * has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When + * there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much + * CPU. */ void nextTuple(); /** - * Storm has determined that the tuple emitted by this spout with the msgId identifier - * has been fully processed. Typically, an implementation of this method will take that - * message off the queue and prevent it from being replayed. + * Storm has determined that the tuple emitted by this spout with the msgId identifier has been fully processed. Typically, an implementation of this method + * will take that message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** - * The tuple emitted by this spout with the msgId identifier has failed to be - * fully processed. Typically, an implementation of this method will put that + * The tuple emitted by this spout with the msgId identifier has failed to be fully processed. Typically, an implementation of this method will put that * message back on the queue to be replayed at a later time. */ void fail(Object msgId);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java index 3cebe43..00640ea 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java @@ -21,10 +21,11 @@ import java.util.List; public interface ISpoutOutputCollector { /** - Returns the task ids that received the tuples. - */ + * Returns the task ids that received the tuples. + */ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); + void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); + void reportError(Throwable error); } - http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java b/jstorm-core/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java index d0bdfa8..7fc288a 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java @@ -20,15 +20,14 @@ package backtype.storm.spout; import java.util.Map; /** - * The strategy a spout needs to use when its waiting. Waiting is - * triggered in one of two conditions: + * The strategy a spout needs to use when its waiting. Waiting is triggered in one of two conditions: * - * 1. nextTuple emits no tuples - * 2. The spout has hit maxSpoutPending and can't emit any more tuples + * 1. nextTuple emits no tuples 2. The spout has hit maxSpoutPending and can't emit any more tuples * * The default strategy sleeps for one millisecond. */ public interface ISpoutWaitStrategy { void prepare(Map conf); + void emptyEmit(long streak); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/MultiScheme.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/MultiScheme.java b/jstorm-core/src/main/java/backtype/storm/spout/MultiScheme.java index ca2ce91..b75bd51 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/MultiScheme.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/MultiScheme.java @@ -23,6 +23,7 @@ import java.io.Serializable; import backtype.storm.tuple.Fields; public interface MultiScheme extends Serializable { - public Iterable<List<Object>> deserialize(byte[] ser); - public Fields getOutputFields(); + public Iterable<List<Object>> deserialize(byte[] ser); + + public Fields getOutputFields(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java b/jstorm-core/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java index 36bea94..7f0df27 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java @@ -21,7 +21,7 @@ import java.util.Map; public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy { @Override - public void emptyEmit(long streak) { + public void emptyEmit(long streak) { } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/RawMultiScheme.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/RawMultiScheme.java b/jstorm-core/src/main/java/backtype/storm/spout/RawMultiScheme.java index 7f73975..8c31097 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/RawMultiScheme.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/RawMultiScheme.java @@ -21,18 +21,17 @@ import java.util.List; import backtype.storm.tuple.Fields; - import static backtype.storm.utils.Utils.tuple; import static java.util.Arrays.asList; public class RawMultiScheme implements MultiScheme { - @Override - public Iterable<List<Object>> deserialize(byte[] ser) { - return asList(tuple(ser)); - } + @Override + public Iterable<List<Object>> deserialize(byte[] ser) { + return asList(tuple(ser)); + } - @Override - public Fields getOutputFields() { - return new Fields("bytes"); - } + @Override + public Fields getOutputFields() { + return new Fields("bytes"); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/Scheme.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/Scheme.java b/jstorm-core/src/main/java/backtype/storm/spout/Scheme.java index ca68954..6c8aeed 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/Scheme.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/Scheme.java @@ -21,8 +21,8 @@ import backtype.storm.tuple.Fields; import java.io.Serializable; import java.util.List; - public interface Scheme extends Serializable { public List<Object> deserialize(byte[] ser); + public Fields getOutputFields(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java b/jstorm-core/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java index 29f7fce..3414533 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java @@ -23,19 +23,23 @@ import java.util.List; import backtype.storm.tuple.Fields; public class SchemeAsMultiScheme implements MultiScheme { - public final Scheme scheme; + public final Scheme scheme; - public SchemeAsMultiScheme(Scheme scheme) { - this.scheme = scheme; - } + public SchemeAsMultiScheme(Scheme scheme) { + this.scheme = scheme; + } - @Override public Iterable<List<Object>> deserialize(final byte[] ser) { - List<Object> o = scheme.deserialize(ser); - if(o == null) return null; - else return Arrays.asList(o); - } + @Override + public Iterable<List<Object>> deserialize(final byte[] ser) { + List<Object> o = scheme.deserialize(ser); + if (o == null) + return null; + else + return Arrays.asList(o); + } - @Override public Fields getOutputFields() { - return scheme.getOutputFields(); - } + @Override + public Fields getOutputFields() { + return scheme.getOutputFields(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java index 06c1647..f680550 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java @@ -25,29 +25,28 @@ import backtype.storm.multilang.ShellMsg; import backtype.storm.multilang.SpoutMsg; import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; -import java.util.Map; +import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; +import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import clojure.lang.RT; -import com.google.common.util.concurrent.MoreExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - public class ShellSpout implements ISpout { public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class); private SpoutOutputCollector _collector; private String[] _command; private ShellProcess _process; - + private TopologyContext _context; - + private SpoutMsg _spoutMsg; private int workerTimeoutMills; @@ -62,8 +61,7 @@ public class ShellSpout implements ISpout { _command = command; } - public void open(Map stormConf, TopologyContext context, - SpoutOutputCollector collector) { + public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _context = context; @@ -108,25 +106,25 @@ public class ShellSpout implements ISpout { _spoutMsg.setId(msgId); querySubprocess(); } - + private void handleMetrics(ShellMsg shellMsg) { - //get metric name + // get metric name String name = shellMsg.getMetricName(); if (name.isEmpty()) { throw new RuntimeException("Receive Metrics name is empty"); } - - //get metric by name + + // get metric by name IMetric iMetric = _context.getRegisteredMetricByName(name); if (iMetric == null) { - throw new RuntimeException("Could not find metric by name["+name+"] "); + throw new RuntimeException("Could not find metric by name[" + name + "] "); } - if ( !(iMetric instanceof IShellMetric)) { - throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC"); + if (!(iMetric instanceof IShellMetric)) { + throw new RuntimeException("Metric[" + name + "] is not IShellMetric, can not call by RPC"); } - IShellMetric iShellMetric = (IShellMetric)iMetric; - - //call updateMetricFromRPC with params + IShellMetric iShellMetric = (IShellMetric) iMetric; + + // call updateMetricFromRPC with params Object paramsObj = shellMsg.getMetricParams(); try { iShellMetric.updateMetricFromRPC(paramsObj); @@ -134,7 +132,7 @@ public class ShellSpout implements ISpout { throw re; } catch (Exception e) { throw new RuntimeException(e); - } + } } private void querySubprocess() { @@ -187,24 +185,24 @@ public class ShellSpout implements ISpout { ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); switch (logLevel) { - case TRACE: - LOG.trace(msg); - break; - case DEBUG: - LOG.debug(msg); - break; - case INFO: - LOG.info(msg); - break; - case WARN: - LOG.warn(msg); - break; - case ERROR: - LOG.error(msg); - break; - default: - LOG.info(msg); - break; + case TRACE: + LOG.trace(msg); + break; + case DEBUG: + LOG.debug(msg); + break; + case INFO: + LOG.info(msg); + break; + case WARN: + LOG.warn(msg); + break; + case ERROR: + LOG.error(msg); + break; + default: + LOG.info(msg); + break; } } @@ -254,8 +252,7 @@ public class ShellSpout implements ISpout { long currentTimeMillis = System.currentTimeMillis(); long lastHeartbeat = getLastHeartbeat(); - LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}", - currentTimeMillis, lastHeartbeat, workerTimeoutMills); + LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { spout.die(new RuntimeException("subprocess heartbeat timeout")); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java b/jstorm-core/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java index 3ccf4e1..e01f668 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java @@ -20,11 +20,10 @@ package backtype.storm.spout; import backtype.storm.Config; import java.util.Map; - public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy { long sleepMillis; - + @Override public void prepare(Map conf) { sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/SpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/SpoutOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/spout/SpoutOutputCollector.java index 7a33026..4b9dc51 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/SpoutOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/SpoutOutputCollector.java @@ -22,11 +22,9 @@ import backtype.storm.utils.Utils; import java.util.List; /** - * This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}. - * The main difference between this output collector and {@link OutputCollector} - * for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be - * acked or failed later on. This is the Spout portion of Storm's API to - * guarantee that each message is fully processed at least once. + * This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}. The main difference between this output + * collector and {@link OutputCollector} for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be acked or + * failed later on. This is the Spout portion of Storm's API to guarantee that each message is fully processed at least once. */ public class SpoutOutputCollector implements ISpoutOutputCollector { ISpoutOutputCollector _delegate; @@ -36,13 +34,10 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { } /** - * Emits a new tuple to the specified output stream with the given message ID. - * When Storm detects that this tuple has been fully processed, or has failed - * to be fully processed, the spout will receive an ack or fail callback respectively - * with the messageId as long as the messageId was not null. If the messageId was null, - * Storm will not track the tuple and no callback will be received. The emitted values must be - * immutable. - * + * Emits a new tuple to the specified output stream with the given message ID. When Storm detects that this tuple has been fully processed, or has failed to + * be fully processed, the spout will receive an ack or fail callback respectively with the messageId as long as the messageId was not null. If the + * messageId was null, Storm will not track the tuple and no callback will be received. The emitted values must be immutable. + * * @return the list of task ids that this tuple was sent to */ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { @@ -50,13 +45,10 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { } /** - * Emits a new tuple to the default output stream with the given message ID. - * When Storm detects that this tuple has been fully processed, or has failed - * to be fully processed, the spout will receive an ack or fail callback respectively - * with the messageId as long as the messageId was not null. If the messageId was null, - * Storm will not track the tuple and no callback will be received. The emitted values must be - * immutable. - * + * Emits a new tuple to the default output stream with the given message ID. When Storm detects that this tuple has been fully processed, or has failed to + * be fully processed, the spout will receive an ack or fail callback respectively with the messageId as long as the messageId was not null. If the + * messageId was null, Storm will not track the tuple and no callback will be received. The emitted values must be immutable. + * * @return the list of task ids that this tuple was sent to */ public List<Integer> emit(List<Object> tuple, Object messageId) { @@ -64,64 +56,56 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { } /** - * Emits a tuple to the default output stream with a null message id. Storm will - * not track this message so ack and fail will never be called for this tuple. The - * emitted values must be immutable. + * Emits a tuple to the default output stream with a null message id. Storm will not track this message so ack and fail will never be called for this tuple. + * The emitted values must be immutable. */ public List<Integer> emit(List<Object> tuple) { return emit(tuple, null); } /** - * Emits a tuple to the specified output stream with a null message id. Storm will - * not track this message so ack and fail will never be called for this tuple. The - * emitted values must be immutable. + * Emits a tuple to the specified output stream with a null message id. Storm will not track this message so ack and fail will never be called for this + * tuple. The emitted values must be immutable. */ public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, tuple, null); } /** - * Emits a tuple to the specified task on the specified output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. + * Emits a tuple to the specified task on the specified output stream. This output stream must have been declared as a direct stream, and the specified task + * must use a direct grouping on this stream to receive the message. The emitted values must be immutable. */ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { _delegate.emitDirect(taskId, streamId, tuple, messageId); } /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. + * Emits a tuple to the specified task on the default output stream. This output stream must have been declared as a direct stream, and the specified task + * must use a direct grouping on this stream to receive the message. The emitted values must be immutable. */ public void emitDirect(int taskId, List<Object> tuple, Object messageId) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); } - + /** - * Emits a tuple to the specified task on the specified output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * <p> Because no message id is specified, Storm will not track this message - * so ack and fail will never be called for this tuple.</p> + * Emits a tuple to the specified task on the specified output stream. This output stream must have been declared as a direct stream, and the specified task + * must use a direct grouping on this stream to receive the message. The emitted values must be immutable. + * + * <p> + * Because no message id is specified, Storm will not track this message so ack and fail will never be called for this tuple. + * </p> */ public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, tuple, null); } /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * <p> Because no message id is specified, Storm will not track this message - * so ack and fail will never be called for this tuple.</p> + * Emits a tuple to the specified task on the default output stream. This output stream must have been declared as a direct stream, and the specified task + * must use a direct grouping on this stream to receive the message. The emitted values must be immutable. + * + * <p> + * Because no message id is specified, Storm will not track this message so ack and fail will never be called for this tuple. + * </p> */ public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, tuple, null); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/state/IStateSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/state/IStateSpout.java b/jstorm-core/src/main/java/backtype/storm/state/IStateSpout.java index f4aa14f..7d6efd4 100755 --- a/jstorm-core/src/main/java/backtype/storm/state/IStateSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/state/IStateSpout.java @@ -23,7 +23,10 @@ import java.util.Map; public interface IStateSpout extends Serializable { void open(Map conf, TopologyContext context); + void close(); + void nextTuple(StateSpoutOutputCollector collector); + void synchronize(SynchronizeOutputCollector collector); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/state/ISubscribedState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/state/ISubscribedState.java b/jstorm-core/src/main/java/backtype/storm/state/ISubscribedState.java index 6eff72c..ab9b60c 100755 --- a/jstorm-core/src/main/java/backtype/storm/state/ISubscribedState.java +++ b/jstorm-core/src/main/java/backtype/storm/state/ISubscribedState.java @@ -21,5 +21,6 @@ import backtype.storm.tuple.Tuple; public interface ISubscribedState { void set(Object id, Tuple tuple); + void remove(Object id); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java index 9c80a75..926ea38 100755 --- a/jstorm-core/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java @@ -20,5 +20,5 @@ package backtype.storm.state; import java.util.List; public interface ISynchronizeOutputCollector { - void add(int streamId, Object id, List<Object> tuple); + void add(int streamId, Object id, List<Object> tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java index 4bb10e0..b683835 100755 --- a/jstorm-core/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java @@ -17,7 +17,6 @@ */ package backtype.storm.state; - public class StateSpoutOutputCollector extends SynchronizeOutputCollector implements IStateSpoutOutputCollector { @Override http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java index 9fbba6e..6c9817f 100755 --- a/jstorm-core/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java @@ -19,7 +19,6 @@ package backtype.storm.state; import java.util.List; - public class SynchronizeOutputCollector implements ISynchronizeOutputCollector { @Override http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/GeneralTopologyContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/GeneralTopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/GeneralTopologyContext.java index 7540500..88127ae 100644 --- a/jstorm-core/src/main/java/backtype/storm/task/GeneralTopologyContext.java +++ b/jstorm-core/src/main/java/backtype/storm/task/GeneralTopologyContext.java @@ -17,14 +17,6 @@ */ package backtype.storm.task; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.json.simple.JSONAware; - import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.generated.ComponentCommon; @@ -34,6 +26,9 @@ import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.utils.ThriftTopologyUtils; import backtype.storm.utils.Utils; +import org.json.simple.JSONAware; + +import java.util.*; public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; @@ -42,11 +37,10 @@ public class GeneralTopologyContext implements JSONAware { private Map<String, Map<String, Fields>> _componentToStreamToFields; private String _topologyId; protected Map _stormConf; - + // pass in componentToSortedTasks for the case of running tons of tasks in single executor - public GeneralTopologyContext(StormTopology topology, Map stormConf, - Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, - Map<String, Map<String, Fields>> componentToStreamToFields, String topologyId) { + public GeneralTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, + Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String topologyId) { _topology = topology; _stormConf = stormConf; _taskToComponent = taskToComponent; @@ -54,7 +48,7 @@ public class GeneralTopologyContext implements JSONAware { _componentToTasks = componentToSortedTasks; _componentToStreamToFields = componentToStreamToFields; } - + /** * Gets the unique id assigned to this topology. The id is the storm name with a unique nonce appended to it. * @@ -63,7 +57,7 @@ public class GeneralTopologyContext implements JSONAware { public String getTopologyId() { return _topologyId; } - + /** * Please use the getTopologId() instead. * @@ -73,7 +67,7 @@ public class GeneralTopologyContext implements JSONAware { public String getStormId() { return _topologyId; } - + /** * Gets the Thrift object representing the topology. * @@ -82,7 +76,7 @@ public class GeneralTopologyContext implements JSONAware { public StormTopology getRawTopology() { return _topology; } - + /** * Gets the component id for the specified task id. The component id maps to a component id specified for a Spout or Bolt in the topology definition. * @@ -96,14 +90,14 @@ public class GeneralTopologyContext implements JSONAware { return _taskToComponent.get(taskId); } } - + /** * Gets the set of streams declared for the specified component. */ public Set<String> getComponentStreams(String componentId) { return getComponentCommon(componentId).get_streams().keySet(); } - + /** * Gets the task ids allocated for the given component id. The task ids are always returned in ascending order. */ @@ -114,7 +108,7 @@ public class GeneralTopologyContext implements JSONAware { else return new ArrayList<Integer>(ret); } - + /** * Gets the declared output fields for the specified component/stream. */ @@ -125,14 +119,14 @@ public class GeneralTopologyContext implements JSONAware { } return ret; } - + /** * Gets the declared output fields for the specified global stream id. */ public Fields getComponentOutputFields(GlobalStreamId id) { return getComponentOutputFields(id.get_componentId(), id.get_streamId()); } - + /** * Gets the declared inputs to the specified component. * @@ -141,7 +135,7 @@ public class GeneralTopologyContext implements JSONAware { public Map<GlobalStreamId, Grouping> getSources(String componentId) { return getComponentCommon(componentId).get_inputs(); } - + /** * Gets information about who is consuming the outputs of the specified component, and how. * @@ -163,7 +157,7 @@ public class GeneralTopologyContext implements JSONAware { } return ret; } - + @Override public String toJSONString() { Map obj = new HashMap(); @@ -172,25 +166,25 @@ public class GeneralTopologyContext implements JSONAware { // at the minimum should send source info return Utils.to_json(obj); } - + /** * Gets a map from task id to component id. */ public Map<Integer, String> getTaskToComponent() { return _taskToComponent; } - + /** * Gets a list of all component ids in this topology */ public Set<String> getComponentIds() { return ThriftTopologyUtils.getComponentIds(getRawTopology()); } - + public ComponentCommon getComponentCommon(String componentId) { return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId); } - + public int maxTopologyMessageTimeout() { Integer max = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); for (String spout : getRawTopology().get_spouts().keySet()) { @@ -206,4 +200,8 @@ public class GeneralTopologyContext implements JSONAware { } return max; } + + public Map getStormConf() { + return _stormConf; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/IBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/IBolt.java b/jstorm-core/src/main/java/backtype/storm/task/IBolt.java index 48acdda..45f8eef 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/IBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/task/IBolt.java @@ -22,63 +22,70 @@ import java.util.Map; import java.io.Serializable; /** - * An IBolt represents a component that takes tuples as input and produces tuples - * as output. An IBolt can do everything from filtering to joining to functions - * to aggregations. It does not have to process a tuple immediately and may - * hold onto tuples to process later. - * - * <p>A bolt's lifecycle is as follows:</p> - * - * <p>IBolt object created on client machine. The IBolt is serialized into the topology - * (using Java serialization) and submitted to the master machine of the cluster (Nimbus). - * Nimbus then launches workers which deserialize the object, call prepare on it, and then - * start processing tuples.</p> - * - * <p>If you want to parameterize an IBolt, you should set the parameters through its - * constructor and save the parameterization state as instance variables (which will - * then get serialized and shipped to every task executing this bolt across the cluster).</p> - * - * <p>When defining bolts in Java, you should use the IRichBolt interface which adds - * necessary methods for using the Java TopologyBuilder API.</p> + * An IBolt represents a component that takes tuples as input and produces tuples as output. An IBolt can do everything from filtering to joining to functions + * to aggregations. It does not have to process a tuple immediately and may hold onto tuples to process later. + * + * <p> + * A bolt's lifecycle is as follows: + * </p> + * + * <p> + * IBolt object created on client machine. The IBolt is serialized into the topology (using Java serialization) and submitted to the master machine of the + * cluster (Nimbus). Nimbus then launches workers which deserialize the object, call prepare on it, and then start processing tuples. + * </p> + * + * <p> + * If you want to parameterize an IBolt, you should set the parameters through its constructor and save the parameterization state as instance variables (which + * will then get serialized and shipped to every task executing this bolt across the cluster). + * </p> + * + * <p> + * When defining bolts in Java, you should use the IRichBolt interface which adds necessary methods for using the Java TopologyBuilder API. + * </p> */ public interface IBolt extends Serializable { /** - * Called when a task for this component is initialized within a worker on the cluster. - * It provides the bolt with the environment in which the bolt executes. - * - * <p>This includes the:</p> + * Called when a task for this component is initialized within a worker on the cluster. It provides the bolt with the environment in which the bolt + * executes. * - * @param stormConf The Storm configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine. - * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. - * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. + * <p> + * This includes the: + * </p> + * + * @param stormConf The Storm configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this + * machine. + * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this + * task, input and output information, etc. + * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The + * collector is thread-safe and should be saved as an instance variable of this bolt object. */ void prepare(Map stormConf, TopologyContext context, OutputCollector collector); /** - * Process a single tuple of input. The Tuple object contains metadata on it - * about which component/stream/task it came from. The values of the Tuple can - * be accessed using Tuple#getValue. The IBolt does not have to process the Tuple - * immediately. It is perfectly fine to hang onto a tuple and process it later + * Process a single tuple of input. The Tuple object contains metadata on it about which component/stream/task it came from. The values of the Tuple can be + * accessed using Tuple#getValue. The IBolt does not have to process the Tuple immediately. It is perfectly fine to hang onto a tuple and process it later * (for instance, to do an aggregation or join). - * - * <p>Tuples should be emitted using the OutputCollector provided through the prepare method. - * It is required that all input tuples are acked or failed at some point using the OutputCollector. - * Otherwise, Storm will be unable to determine when tuples coming off the spouts - * have been completed.</p> - * - * <p>For the common case of acking an input tuple at the end of the execute method, - * see IBasicBolt which automates this.</p> + * + * <p> + * Tuples should be emitted using the OutputCollector provided through the prepare method. It is required that all input tuples are acked or failed at some + * point using the OutputCollector. Otherwise, Storm will be unable to determine when tuples coming off the spouts have been completed. + * </p> + * + * <p> + * For the common case of acking an input tuple at the end of the execute method, see IBasicBolt which automates this. + * </p> * * @param input The input tuple to be processed. */ void execute(Tuple input); /** - * Called when an IBolt is going to be shutdown. There is no guarentee that cleanup - * will be called, because the supervisor kill -9's worker processes on the cluster. - * - * <p>The one context where cleanup is guaranteed to be called is when a topology - * is killed when running Storm in local mode.</p> + * Called when an IBolt is going to be shutdown. There is no guarentee that cleanup will be called, because the supervisor kill -9's worker processes on the + * cluster. + * + * <p> + * The one context where cleanup is guaranteed to be called is when a topology is killed when running Storm in local mode. + * </p> */ void cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/IMetricsContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/IMetricsContext.java b/jstorm-core/src/main/java/backtype/storm/task/IMetricsContext.java index a1d8bc2..cb77ab5 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/IMetricsContext.java +++ b/jstorm-core/src/main/java/backtype/storm/task/IMetricsContext.java @@ -23,9 +23,10 @@ import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.IReducer; import backtype.storm.metric.api.ReducedMetric; - public interface IMetricsContext { <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs); + ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs); - CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs); + + CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/IOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/IOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/task/IOutputCollector.java index a62563a..1759310 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/IOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/task/IOutputCollector.java @@ -23,10 +23,13 @@ import java.util.List; public interface IOutputCollector extends IErrorReporter { /** - * Returns the task ids that received the tuples. + * Returns the task ids that received the tuples. */ List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple); + void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); + void ack(Tuple input); + void fail(Tuple input); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/OutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/OutputCollector.java b/jstorm-core/src/main/java/backtype/storm/task/OutputCollector.java index 620d33d..673a8b2 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/OutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/task/OutputCollector.java @@ -24,22 +24,19 @@ import java.util.Collection; import java.util.List; /** - * This output collector exposes the API for emitting tuples from an IRichBolt. - * This is the core API for emitting tuples. For a simpler API, and a more restricted - * form of stream processing, see IBasicBolt and BasicOutputCollector. + * This output collector exposes the API for emitting tuples from an IRichBolt. This is the core API for emitting tuples. For a simpler API, and a more + * restricted form of stream processing, see IBasicBolt and BasicOutputCollector. */ public class OutputCollector implements IOutputCollector { private IOutputCollector _delegate; - - + public OutputCollector(IOutputCollector delegate) { _delegate = delegate; } - + /** - * Emits a new tuple to a specific stream with a single anchor. The emitted values must be - * immutable. - * + * Emits a new tuple to a specific stream with a single anchor. The emitted values must be immutable. + * * @param streamId the stream to emit to * @param anchor the tuple to anchor to * @param tuple the new output tuple from this bolt @@ -50,10 +47,8 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a new unanchored tuple to the specified stream. Because it's unanchored, - * if a failure happens downstream, this new tuple won't affect whether any - * spout tuples are considered failed or not. The emitted values must be - * immutable. + * Emits a new unanchored tuple to the specified stream. Because it's unanchored, if a failure happens downstream, this new tuple won't affect whether any + * spout tuples are considered failed or not. The emitted values must be immutable. * * @param streamId the stream to emit to * @param tuple the new output tuple from this bolt @@ -64,8 +59,7 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a new tuple to the default stream anchored on a group of input tuples. The emitted - * values must be immutable. + * Emits a new tuple to the default stream anchored on a group of input tuples. The emitted values must be immutable. * * @param anchors the tuples to anchor to * @param tuple the new output tuple from this bolt @@ -75,10 +69,8 @@ public class OutputCollector implements IOutputCollector { return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple); } - /** - * Emits a new tuple to the default stream anchored on a single tuple. The emitted values must be - * immutable. + * Emits a new tuple to the default stream anchored on a single tuple. The emitted values must be immutable. * * @param anchor the tuple to anchor to * @param tuple the new output tuple from this bolt @@ -89,11 +81,9 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a new unanchored tuple to the default stream. Beacuse it's unanchored, - * if a failure happens downstream, this new tuple won't affect whether any - * spout tuples are considered failed or not. The emitted values must be - * immutable. - * + * Emits a new unanchored tuple to the default stream. Beacuse it's unanchored, if a failure happens downstream, this new tuple won't affect whether any + * spout tuples are considered failed or not. The emitted values must be immutable. + * * @param tuple the new output tuple from this bolt * @return the list of task ids that this new tuple was sent to */ @@ -102,13 +92,10 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a tuple directly to the specified task id on the specified stream. - * If the target bolt does not subscribe to this bolt using a direct grouping, - * the tuple will not be sent. If the specified output stream is not declared - * as direct, or the target bolt subscribes with a non-direct grouping, - * an error will occur at runtime. The emitted values must be - * immutable. - * + * Emits a tuple directly to the specified task id on the specified stream. If the target bolt does not subscribe to this bolt using a direct grouping, the + * tuple will not be sent. If the specified output stream is not declared as direct, or the target bolt subscribes with a non-direct grouping, an error will + * occur at runtime. The emitted values must be immutable. + * * @param taskId the taskId to send the new tuple to * @param streamId the stream to send the tuple on. It must be declared as a direct stream in the topology definition. * @param anchor the tuple to anchor to @@ -119,14 +106,11 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a tuple directly to the specified task id on the specified stream. - * If the target bolt does not subscribe to this bolt using a direct grouping, - * the tuple will not be sent. If the specified output stream is not declared - * as direct, or the target bolt subscribes with a non-direct grouping, - * an error will occur at runtime. Note that this method does not use anchors, - * so downstream failures won't affect the failure status of any spout tuples. - * The emitted values must be immutable. - * + * Emits a tuple directly to the specified task id on the specified stream. If the target bolt does not subscribe to this bolt using a direct grouping, the + * tuple will not be sent. If the specified output stream is not declared as direct, or the target bolt subscribes with a non-direct grouping, an error will + * occur at runtime. Note that this method does not use anchors, so downstream failures won't affect the failure status of any spout tuples. The emitted + * values must be immutable. + * * @param taskId the taskId to send the new tuple to * @param streamId the stream to send the tuple on. It must be declared as a direct stream in the topology definition. * @param tuple the new output tuple from this bolt @@ -136,17 +120,15 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a tuple directly to the specified task id on the default stream. - * If the target bolt does not subscribe to this bolt using a direct grouping, - * the tuple will not be sent. If the specified output stream is not declared - * as direct, or the target bolt subscribes with a non-direct grouping, - * an error will occur at runtime. The emitted values must be - * immutable. - * - * <p>The default stream must be declared as direct in the topology definition. - * See OutputDeclarer#declare for how this is done when defining topologies - * in Java.</p> - * + * Emits a tuple directly to the specified task id on the default stream. If the target bolt does not subscribe to this bolt using a direct grouping, the + * tuple will not be sent. If the specified output stream is not declared as direct, or the target bolt subscribes with a non-direct grouping, an error will + * occur at runtime. The emitted values must be immutable. + * + * <p> + * The default stream must be declared as direct in the topology definition. See OutputDeclarer#declare for how this is done when defining topologies in + * Java. + * </p> + * * @param taskId the taskId to send the new tuple to * @param anchosr the tuples to anchor to * @param tuple the new output tuple from this bolt @@ -156,17 +138,15 @@ public class OutputCollector implements IOutputCollector { } /** - * Emits a tuple directly to the specified task id on the default stream. - * If the target bolt does not subscribe to this bolt using a direct grouping, - * the tuple will not be sent. If the specified output stream is not declared - * as direct, or the target bolt subscribes with a non-direct grouping, - * an error will occur at runtime. The emitted values must be - * immutable. - * - * <p>The default stream must be declared as direct in the topology definition. - * See OutputDeclarer#declare for how this is done when defining topologies - * in Java.</p> - * + * Emits a tuple directly to the specified task id on the default stream. If the target bolt does not subscribe to this bolt using a direct grouping, the + * tuple will not be sent. If the specified output stream is not declared as direct, or the target bolt subscribes with a non-direct grouping, an error will + * occur at runtime. The emitted values must be immutable. + * + * <p> + * The default stream must be declared as direct in the topology definition. See OutputDeclarer#declare for how this is done when defining topologies in + * Java. + * </p> + * * @param taskId the taskId to send the new tuple to * @param anchor the tuple to anchor to * @param tuple the new output tuple from this bolt @@ -175,22 +155,20 @@ public class OutputCollector implements IOutputCollector { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchor, tuple); } - /** - * Emits a tuple directly to the specified task id on the default stream. - * If the target bolt does not subscribe to this bolt using a direct grouping, - * the tuple will not be sent. If the specified output stream is not declared - * as direct, or the target bolt subscribes with a non-direct grouping, - * an error will occur at runtime. The emitted values must be - * immutable. - * - * <p>The default stream must be declared as direct in the topology definition. - * See OutputDeclarer#declare for how this is done when defining topologies - * in Java.</p> - * - * <p>Note that this method does not use anchors, so downstream failures won't - * affect the failure status of any spout tuples.</p> - * + * Emits a tuple directly to the specified task id on the default stream. If the target bolt does not subscribe to this bolt using a direct grouping, the + * tuple will not be sent. If the specified output stream is not declared as direct, or the target bolt subscribes with a non-direct grouping, an error will + * occur at runtime. The emitted values must be immutable. + * + * <p> + * The default stream must be declared as direct in the topology definition. See OutputDeclarer#declare for how this is done when defining topologies in + * Java. + * </p> + * + * <p> + * Note that this method does not use anchors, so downstream failures won't affect the failure status of any spout tuples. + * </p> + * * @param taskId the taskId to send the new tuple to * @param tuple the new output tuple from this bolt */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/ShellBolt.java b/jstorm-core/src/main/java/backtype/storm/task/ShellBolt.java index d9c8f03..ae7c76f 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/ShellBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/task/ShellBolt.java @@ -40,29 +40,28 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.SECONDS; /** - * A bolt that shells out to another process to process tuples. ShellBolt - * communicates with that process over stdio using a special protocol. An ~100 - * line library is required to implement that protocol, and adapter libraries - * currently exist for Ruby and Python. - * - * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be - * in the resources directory within the jar submitted to the master. - * During development/testing on a local machine, that resources directory just - * needs to be on the classpath.</p> - * - * <p>When creating topologies using the Java API, subclass this bolt and implement - * the IRichBolt interface to create components for the topology that use other languages. For example: + * A bolt that shells out to another process to process tuples. ShellBolt communicates with that process over stdio using a special protocol. An ~100 line + * library is required to implement that protocol, and adapter libraries currently exist for Ruby and Python. + * + * <p> + * To run a ShellBolt on a cluster, the scripts that are shelled out to must be in the resources directory within the jar submitted to the master. During + * development/testing on a local machine, that resources directory just needs to be on the classpath. * </p> - * + * + * <p> + * When creating topologies using the Java API, subclass this bolt and implement the IRichBolt interface to create components for the topology that use other + * languages. For example: + * </p> + * * <pre> * public class MyBolt extends ShellBolt implements IRichBolt { - * public MyBolt() { - * super("python", "mybolt.py"); - * } - * - * public void declareOutputFields(OutputFieldsDeclarer declarer) { - * declarer.declare(new Fields("field1", "field2")); - * } + * public MyBolt() { + * super("python", "mybolt.py"); + * } + * + * public void declareOutputFields(OutputFieldsDeclarer declarer) { + * declarer.declare(new Fields("field1", "field2")); + * } * } * </pre> */ @@ -82,7 +81,7 @@ public class ShellBolt implements IBolt { private Thread _readerThread; private Thread _writerThread; - + private TopologyContext _context; private int workerTimeoutMills; @@ -98,11 +97,10 @@ public class ShellBolt implements IBolt { _command = command; } - public void prepare(Map stormConf, TopologyContext context, - final OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); + this._pendingWrites = new LinkedBlockingQueue(((Number) maxPending).intValue()); } _rand = new Random(); _collector = collector; @@ -113,7 +111,7 @@ public class ShellBolt implements IBolt { _process = new ShellProcess(_command); - //subprocesses must send their pid first thing + // subprocesses must send their pid first thing Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); @@ -136,14 +134,14 @@ public class ShellBolt implements IBolt { throw new RuntimeException(_exception); } - //just need an id + // just need an id String genId = Long.toString(_rand.nextLong()); _inputs.put(genId, input); try { BoltMsg boltMsg = createBoltMessage(input, genId); _pendingWrites.put(boltMsg); - } catch(InterruptedException e) { + } catch (InterruptedException e) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); throw new RuntimeException("Error during multilang processing " + processInfo, e); } @@ -170,7 +168,7 @@ public class ShellBolt implements IBolt { private void handleAck(Object id) { Tuple acked = _inputs.remove(id); - if(acked==null) { + if (acked == null) { throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id); } _collector.ack(acked); @@ -178,7 +176,7 @@ public class ShellBolt implements IBolt { private void handleFail(Object id) { Tuple failed = _inputs.remove(id); - if(failed==null) { + if (failed == null) { throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id); } _collector.fail(failed); @@ -201,14 +199,13 @@ public class ShellBolt implements IBolt { } } - if(shellMsg.getTask() == 0) { + if (shellMsg.getTask() == 0) { List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); if (shellMsg.areTaskIdsNeeded()) { _pendingWrites.put(outtasks); } } else { - _collector.emitDirect((int) shellMsg.getTask(), - shellMsg.getStream(), anchors, shellMsg.getTuple()); + _collector.emitDirect((int) shellMsg.getTask(), shellMsg.getStream(), anchors, shellMsg.getTuple()); } } @@ -218,46 +215,46 @@ public class ShellBolt implements IBolt { ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); switch (logLevel) { - case TRACE: - LOG.trace(msg); - break; - case DEBUG: - LOG.debug(msg); - break; - case INFO: - LOG.info(msg); - break; - case WARN: - LOG.warn(msg); - break; - case ERROR: - LOG.error(msg); - _collector.reportError(new ReportedFailedException(msg)); - break; - default: - LOG.info(msg); - break; + case TRACE: + LOG.trace(msg); + break; + case DEBUG: + LOG.debug(msg); + break; + case INFO: + LOG.info(msg); + break; + case WARN: + LOG.warn(msg); + break; + case ERROR: + LOG.error(msg); + _collector.reportError(new ReportedFailedException(msg)); + break; + default: + LOG.info(msg); + break; } } private void handleMetrics(ShellMsg shellMsg) { - //get metric name + // get metric name String name = shellMsg.getMetricName(); if (name.isEmpty()) { throw new RuntimeException("Receive Metrics name is empty"); } - - //get metric by name + + // get metric by name IMetric iMetric = _context.getRegisteredMetricByName(name); if (iMetric == null) { - throw new RuntimeException("Could not find metric by name["+name+"] "); + throw new RuntimeException("Could not find metric by name[" + name + "] "); } - if ( !(iMetric instanceof IShellMetric)) { - throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC"); + if (!(iMetric instanceof IShellMetric)) { + throw new RuntimeException("Metric[" + name + "] is not IShellMetric, can not call by RPC"); } - IShellMetric iShellMetric = (IShellMetric)iMetric; - - //call updateMetricFromRPC with params + IShellMetric iShellMetric = (IShellMetric) iMetric; + + // call updateMetricFromRPC with params Object paramsObj = shellMsg.getMetricParams(); try { iShellMetric.updateMetricFromRPC(paramsObj); @@ -265,7 +262,7 @@ public class ShellBolt implements IBolt { throw re; } catch (Exception e) { throw new RuntimeException(e); - } + } } private void setHeartbeat() { @@ -279,12 +276,10 @@ public class ShellBolt implements IBolt { private void die(Throwable exception) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); _exception = new RuntimeException(processInfo, exception); - String message = String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s", - Arrays.toString(_command), - processInfo); + String message = String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s", Arrays.toString(_command), processInfo); LOG.error(message, exception); _collector.reportError(exception); - if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error + if (_running || (exception instanceof Error)) { // don't exit if not running, unless it is an Error System.exit(11); } } @@ -301,8 +296,7 @@ public class ShellBolt implements IBolt { long currentTimeMillis = System.currentTimeMillis(); long lastHeartbeat = getLastHeartbeat(); - LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", - currentTimeMillis, lastHeartbeat, workerTimeoutMills); + LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { bolt.die(new RuntimeException("subprocess heartbeat timeout")); @@ -311,7 +305,6 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(false, true); } - } private class BoltReaderRunnable implements Runnable { @@ -326,7 +319,7 @@ public class ShellBolt implements IBolt { } if (command.equals("sync")) { setHeartbeat(); - } else if(command.equals("ack")) { + } else if (command.equals("ack")) { handleAck(shellMsg.getId()); } else if (command.equals("fail")) { handleFail(shellMsg.getId()); @@ -363,7 +356,7 @@ public class ShellBolt implements IBolt { if (write instanceof BoltMsg) { _process.writeBoltMsg((BoltMsg) write); } else if (write instanceof List<?>) { - _process.writeTaskIds((List<Integer>)write); + _process.writeTaskIds((List<Integer>) write); } else if (write != null) { throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); }
