http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java index 1570aeb..aaf6875 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java @@ -36,103 +36,103 @@ import backtype.storm.tuple.Values; public class TestEventLogSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class); - + private static final Map<String, Integer> acked = new HashMap<String, Integer>(); private static final Map<String, Integer> failed = new HashMap<String, Integer>(); - + private String uid; private long totalCount; - + SpoutOutputCollector _collector; private long eventId = 0; private long myCount; private int source; - + public static int getNumAcked(String stormId) { - synchronized(acked) { + synchronized (acked) { return get(acked, stormId, 0); } } public static int getNumFailed(String stormId) { - synchronized(failed) { + synchronized (failed) { return get(failed, stormId, 0); } } - + public TestEventLogSpout(long totalCount) { this.uid = UUID.randomUUID().toString(); - - synchronized(acked) { + + synchronized (acked) { acked.put(uid, 0); } - synchronized(failed) { + synchronized (failed) { failed.put(uid, 0); } - + this.totalCount = totalCount; } - + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; this.source = context.getThisTaskId(); long taskCount = context.getComponentTasks(context.getThisComponentId()).size(); myCount = totalCount / taskCount; } - + public void close() { - + } - + public void cleanup() { - synchronized(acked) { + synchronized (acked) { acked.remove(uid); - } - synchronized(failed) { + } + synchronized (failed) { failed.remove(uid); } } - + public boolean completed() { - + int ackedAmt; int failedAmt; - - synchronized(acked) { + + synchronized (acked) { ackedAmt = acked.get(uid); } - synchronized(failed) { + synchronized (failed) { failedAmt = failed.get(uid); } int totalEmitted = ackedAmt + failedAmt; - + if (totalEmitted >= totalCount) { return true; } return false; } - + public void nextTuple() { - if (eventId < myCount) { + if (eventId < myCount) { eventId++; _collector.emit(new Values(source, eventId), eventId); - } + } } - + public void ack(Object msgId) { - synchronized(acked) { + synchronized (acked) { int curr = get(acked, uid, 0); - acked.put(uid, curr+1); + acked.put(uid, curr + 1); } } public void fail(Object msgId) { - synchronized(failed) { + synchronized (failed) { int curr = get(failed, uid, 0); - failed.put(uid, curr+1); + failed.put(uid, curr + 1); } } - + public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source", "eventId")); }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java index 1f80362..8286d0b 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java @@ -36,7 +36,7 @@ import backtype.storm.tuple.Values; public class TestEventOrderCheckBolt extends BaseRichBolt { public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class); - + private int _count; OutputCollector _collector; Map<Integer, Long> recentEventId = new HashMap<Integer, Long>(); @@ -52,8 +52,9 @@ public class TestEventOrderCheckBolt extends BaseRichBolt { Long recentEvent = recentEventId.get(sourceId); if (null != recentEvent && eventId <= recentEvent) { - String error = "Error: event id is not in strict order! event source Id: " - + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId; + String error = + "Error: event id is not in strict order! event source Id: " + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + + eventId; _collector.emit(input, new Values(error)); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java index 5ef464a..45f48e4 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java @@ -28,7 +28,6 @@ import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TestGlobalCount extends BaseRichBolt { public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java index d41c36a..099a8db 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java @@ -20,10 +20,7 @@ package backtype.storm.testing; import backtype.storm.ILocalCluster; /** - * This is the core interface for the storm java testing, usually - * we put our java unit testing logic in the run method. A sample - * code will be: - * <code> + * This is the core interface for the storm java testing, usually we put our java unit testing logic in the run method. A sample code will be: <code> * Testing.withSimulatedTimeLocalCluster(new TestJob() { * public void run(Cluster cluster) { * // your testing logic here. @@ -31,11 +28,10 @@ import backtype.storm.ILocalCluster; * }); */ public interface TestJob { - /** - * run the testing logic with the cluster. - * - * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> - * and <code>Testing.withTrackedCluster</code>. - */ + /** + * run the testing logic with the cluster. + * + * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> and <code>Testing.withTrackedCluster</code>. + */ public void run(ILocalCluster cluster) throws Exception; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java index 0d30b26..769f1cf 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java @@ -25,16 +25,15 @@ import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; - public class TestPlannerBolt extends BaseRichBolt { public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } - + public void execute(Tuple input) { } - + public Fields getOutputFields() { return new Fields("field1", "field2"); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java index f4c27c0..bcacd4d 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java @@ -27,11 +27,10 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import java.util.HashMap; - public class TestPlannerSpout extends BaseRichSpout { boolean _isDistributed; Fields _outFields; - + public TestPlannerSpout(Fields outFields, boolean isDistributed) { _isDistributed = isDistributed; _outFields = outFields; @@ -40,34 +39,33 @@ public class TestPlannerSpout extends BaseRichSpout { public TestPlannerSpout(boolean isDistributed) { this(new Fields("field1", "field2"), isDistributed); } - + public TestPlannerSpout(Fields outFields) { this(outFields, true); } - + public Fields getOutputFields() { return _outFields; } - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - + } - + public void close() { - + } - + public void nextTuple() { Utils.sleep(100); } - - public void ack(Object msgId){ - + + public void ack(Object msgId) { + } - public void fail(Object msgId){ - + public void fail(Object msgId) { + } public void declareOutputFields(OutputFieldsDeclarer declarer) { @@ -77,9 +75,9 @@ public class TestPlannerSpout extends BaseRichSpout { @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> ret = new HashMap<String, Object>(); - if(!_isDistributed) { + if (!_isDistributed) { ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); } return ret; - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java index 1c7706f..5416ef0 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java @@ -22,12 +22,12 @@ import java.io.Serializable; public class TestSerObject implements Serializable { public int f1; public int f2; - + public TestSerObject(int f1, int f2) { this.f1 = f1; this.f2 = f2; } - + @Override public int hashCode() { final int prime = 31; @@ -36,7 +36,7 @@ public class TestSerObject implements Serializable { result = prime * result + f2; return result; } - + @Override public boolean equals(Object obj) { if (this == obj) @@ -52,5 +52,5 @@ public class TestSerObject implements Serializable { return false; return true; } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java index 551b054..c6b32b5 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java @@ -29,29 +29,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static backtype.storm.utils.Utils.tuple; - public class TestWordCounter extends BaseBasicBolt { public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class); Map<String, Integer> _counts; - + public void prepare(Map stormConf, TopologyContext context) { _counts = new HashMap<String, Integer>(); } - + public void execute(Tuple input, BasicOutputCollector collector) { String word = (String) input.getValues().get(0); int count = 0; - if(_counts.containsKey(word)) { + if (_counts.containsKey(word)) { count = _counts.get(word); } count++; _counts.put(word, count); collector.emit(tuple(word, count)); } - + public void cleanup() { - + } public void declareOutputFields(OutputFieldsDeclarer declarer) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java index 745bf71..d5603a1 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java @@ -31,7 +31,6 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; @@ -44,43 +43,43 @@ public class TestWordSpout extends BaseRichSpout { public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } - + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } - + public void close() { - + } - + public void nextTuple() { Utils.sleep(100); - final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; + final String[] words = new String[] { "nathan", "mike", "jackson", "golda", "bertels" }; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } - + public void ack(Object msgId) { } public void fail(Object msgId) { - + } - + public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { - if(!_isDistributed) { + if (!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java index f2691b7..60506b5 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java @@ -23,12 +23,12 @@ import java.util.Map; import backtype.storm.generated.StormTopology; import clojure.lang.Keyword; -public class TrackedTopology extends HashMap{ - public TrackedTopology(Map map) { - super(map); - } - - public StormTopology getTopology() { - return (StormTopology)get(Keyword.intern("topology")); - } +public class TrackedTopology extends HashMap { + public TrackedTopology(Map map) { + super(map); + } + + public StormTopology getTopology() { + return (StormTopology) get(Keyword.intern("topology")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java index e163576..9635887 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; - public class TupleCaptureBolt implements IRichBolt { public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>(); @@ -47,8 +46,8 @@ public class TupleCaptureBolt implements IRichBolt { public void execute(Tuple input) { String component = input.getSourceComponent(); Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name); - if(!captured.containsKey(component)) { - captured.put(component, new ArrayList<FixedTuple>()); + if (!captured.containsKey(component)) { + captured.put(component, new ArrayList<FixedTuple>()); } captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues())); _collector.ack(input); @@ -60,7 +59,7 @@ public class TupleCaptureBolt implements IRichBolt { public void cleanup() { } - + public Map<String, List<FixedTuple>> getAndRemoveResults() { return emitted_tuples.remove(_name); } @@ -70,7 +69,7 @@ public class TupleCaptureBolt implements IRichBolt { emitted_tuples.get(_name).clear(); return ret; } - + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java index 0c67324..a6614fc 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java @@ -36,19 +36,22 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration @Override public T setMaxTaskParallelism(Number val) { - if(val!=null) val = val.intValue(); + if (val != null) + val = val.intValue(); return addConfiguration(Config.TOPOLOGY_MAX_TASK_PARALLELISM, val); } @Override public T setMaxSpoutPending(Number val) { - if(val!=null) val = val.intValue(); + if (val != null) + val = val.intValue(); return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val); } - + @Override public T setNumTasks(Number val) { - if(val!=null) val = val.intValue(); + if (val != null) + val = val.intValue(); return addConfiguration(Config.TOPOLOGY_TASKS, val); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java index 6c9cdc1..ea437c5 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java @@ -25,11 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BasicBoltExecutor implements IRichBolt { - public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); - + public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); + private IBasicBolt _bolt; private transient BasicOutputCollector _collector; - + public BasicBoltExecutor(IBasicBolt bolt) { _bolt = bolt; } @@ -38,7 +38,6 @@ public class BasicBoltExecutor implements IRichBolt { _bolt.declareOutputFields(declarer); } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _bolt.prepare(stormConf, context); _collector = new BasicOutputCollector(collector); @@ -49,8 +48,8 @@ public class BasicBoltExecutor implements IRichBolt { try { _bolt.execute(input, _collector); _collector.getOutputter().ack(input); - } catch(FailedException e) { - if(e instanceof ReportedFailedException) { + } catch (FailedException e) { + if (e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java index be1c242..e48f159 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java @@ -23,7 +23,6 @@ import backtype.storm.tuple.Tuple; import backtype.storm.utils.Utils; import java.util.List; - public class BasicOutputCollector implements IBasicOutputCollector { private OutputCollector out; private Tuple inputTuple; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java index 0c4b200..8fe05e2 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java @@ -19,8 +19,9 @@ package backtype.storm.topology; /** * BoltDeclarer includes grouping APIs for storm topology. + * * @see <a href="https://storm.apache.org/documentation/Concepts.html">Concepts -Stream groupings-</a> */ public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>, ComponentConfigurationDeclarer<BoltDeclarer> { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java index d05dda0..49d78e5 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java @@ -21,9 +21,14 @@ import java.util.Map; public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> { T addConfigurations(Map conf); + T addConfiguration(String config, Object value); + T setDebug(boolean debug); + T setMaxTaskParallelism(Number val); + T setMaxSpoutPending(Number val); + T setNumTasks(Number val); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java index e174b5a..6c26bbf 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java @@ -21,11 +21,11 @@ public class FailedException extends RuntimeException { public FailedException() { super(); } - + public FailedException(String msg) { super(msg); } - + public FailedException(String msg, Throwable cause) { super(msg, cause); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java index 3b24f4e..81741df 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java @@ -23,11 +23,13 @@ import java.util.Map; public interface IBasicBolt extends IComponent { void prepare(Map stormConf, TopologyContext context); + /** * Process the input tuple and optionally emit new tuples based on the input tuple. * * All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); + void cleanup(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java index 92d60d2..85008c2 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java @@ -21,6 +21,8 @@ import java.util.List; public interface IBasicOutputCollector { List<Integer> emit(String streamId, List<Object> tuple); + void emitDirect(int taskId, String streamId, List<Object> tuple); + void reportError(Throwable t); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java index 560c96f..1d0865d 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java @@ -21,23 +21,21 @@ import java.io.Serializable; import java.util.Map; /** - * Common methods for all possible components in a topology. This interface is used - * when defining topologies using the Java API. + * Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API. */ public interface IComponent extends Serializable { /** * Declare the output schema for all the streams of this topology. - * + * * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream */ void declareOutputFields(OutputFieldsDeclarer declarer); /** - * Declare configuration specific to this component. Only a subset of the "topology.*" configs can - * be overridden. The component configuration can be further overridden when constructing the - * topology using {@link TopologyBuilder} - * + * Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component configuration can be further + * overridden when constructing the topology using {@link TopologyBuilder} + * */ Map<String, Object> getComponentConfiguration(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java b/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java deleted file mode 100644 index 3ce9da7..0000000 --- a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.topology; - -import java.util.Map; - -/* - * This interface is used to notify the update of user configuration - * for bolt and spout - */ -public interface IConfig { - public void updateConf(Map conf); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java new file mode 100644 index 0000000..573ca99 --- /dev/null +++ b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java @@ -0,0 +1,13 @@ +package backtype.storm.topology; + +import java.io.Serializable; +import java.util.Map; + +/* + * This interface is used to notify the update of user configuration + * for bolt and spout + */ + +public interface IDynamicComponent extends Serializable { + public void update(Map conf); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java index d35244e..d44619c 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java @@ -20,9 +20,8 @@ package backtype.storm.topology; import backtype.storm.task.IBolt; /** - * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces - * to use to implement components of the topology. - * + * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology. + * */ public interface IRichBolt extends IBolt, IComponent { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java index b088641..e1bdc02 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java @@ -20,9 +20,8 @@ package backtype.storm.topology; import backtype.storm.spout.ISpout; /** - * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces - * to use to implement components of the topology. - * + * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology. + * */ public interface IRichSpout extends ISpout, IComponent { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java index edcc0ff..a22acd4 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java @@ -19,7 +19,6 @@ package backtype.storm.topology; import backtype.storm.state.IStateSpout; - public interface IRichStateSpout extends IStateSpout, IComponent { } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java index 33540de..54f2702 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java @@ -22,10 +22,10 @@ import backtype.storm.generated.Grouping; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.tuple.Fields; - public interface InputDeclarer<T extends InputDeclarer> { /** * The stream is partitioned by the fields specified in the grouping. + * * @param componentId * @param fields * @return @@ -34,6 +34,7 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * The stream is partitioned by the fields specified in the grouping. + * * @param componentId * @param streamId * @param fields @@ -42,16 +43,16 @@ public interface InputDeclarer<T extends InputDeclarer> { public T fieldsGrouping(String componentId, String streamId, Fields fields); /** - * The entire stream goes to a single one of the bolt's tasks. - * Specifically, it goes to the task with the lowest id. + * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id. + * * @param componentId * @return */ public T globalGrouping(String componentId); /** - * The entire stream goes to a single one of the bolt's tasks. - * Specifically, it goes to the task with the lowest id. + * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id. + * * @param componentId * @param streamId * @return @@ -59,16 +60,16 @@ public interface InputDeclarer<T extends InputDeclarer> { public T globalGrouping(String componentId, String streamId); /** - * Tuples are randomly distributed across the bolt's tasks in a way such that - * each bolt is guaranteed to get an equal number of tuples. + * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. + * * @param componentId * @return */ public T shuffleGrouping(String componentId); /** - * Tuples are randomly distributed across the bolt's tasks in a way such that - * each bolt is guaranteed to get an equal number of tuples. + * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. + * * @param componentId * @param streamId * @return @@ -76,29 +77,31 @@ public interface InputDeclarer<T extends InputDeclarer> { public T shuffleGrouping(String componentId, String streamId); /** - * If the target bolt has one or more tasks in the same worker process, - * tuples will be shuffled to just those in-process tasks. - * Otherwise, this acts like a normal shuffle grouping. + * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a + * normal shuffle grouping. + * * @param componentId * @return */ public T localOrShuffleGrouping(String componentId); /** - * If the target bolt has one or more tasks in the same worker process, - * tuples will be shuffled to just those in-process tasks. - * Otherwise, this acts like a normal shuffle grouping. + * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a + * normal shuffle grouping. + * * @param componentId * @param streamId * @return */ public T localOrShuffleGrouping(String componentId, String streamId); - + public T localFirstGrouping(String componentId); - + public T localFirstGrouping(String componentId, String streamId); + /** * This grouping specifies that you don't care how the stream is grouped. + * * @param componentId * @return */ @@ -106,6 +109,7 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * This grouping specifies that you don't care how the stream is grouped. + * * @param componentId * @param streamId * @return @@ -114,6 +118,7 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * The stream is replicated across all the bolt's tasks. Use this grouping with care. + * * @param componentId * @return */ @@ -121,6 +126,7 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * The stream is replicated across all the bolt's tasks. Use this grouping with care. + * * @param componentId * @param streamId * @return @@ -128,16 +134,16 @@ public interface InputDeclarer<T extends InputDeclarer> { public T allGrouping(String componentId, String streamId); /** - * A stream grouped this way means that the producer of the tuple decides - * which task of the consumer will receive this tuple. + * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. + * * @param componentId * @return */ public T directGrouping(String componentId); /** - * A stream grouped this way means that the producer of the tuple decides - * which task of the consumer will receive this tuple. + * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. + * * @param componentId * @param streamId * @return @@ -145,9 +151,9 @@ public interface InputDeclarer<T extends InputDeclarer> { public T directGrouping(String componentId, String streamId); /** - * Tuples are passed to two hashing functions and each target task is - * decided based on the comparison of the state of candidate nodes. - * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf + * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes. + * + * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf * @param componentId * @param fields * @return @@ -155,9 +161,9 @@ public interface InputDeclarer<T extends InputDeclarer> { public T partialKeyGrouping(String componentId, Fields fields); /** - * Tuples are passed to two hashing functions and each target task is - * decided based on the comparison of the state of candidate nodes. - * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf + * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes. + * + * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf * @param componentId * @param streamId * @param fields @@ -167,6 +173,7 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * A custom stream grouping by implementing the CustomStreamGrouping interface. + * * @param componentId * @param grouping * @return @@ -175,13 +182,14 @@ public interface InputDeclarer<T extends InputDeclarer> { /** * A custom stream grouping by implementing the CustomStreamGrouping interface. + * * @param componentId * @param streamId * @param grouping * @return */ public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); - + public T grouping(GlobalStreamId id, Grouping grouping); - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java index 2ac4794..d5ca7ca 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java @@ -19,14 +19,15 @@ package backtype.storm.topology; import backtype.storm.tuple.Fields; - public interface OutputFieldsDeclarer { /** * Uses default stream id. */ public void declare(Fields fields); + public void declare(boolean direct, Fields fields); - + public void declareStream(String streamId, Fields fields); + public void declareStream(String streamId, boolean direct, Fields fields); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java index 0e7fd59..1fdcf86 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java @@ -39,13 +39,12 @@ public class OutputFieldsGetter implements OutputFieldsDeclarer { } public void declareStream(String streamId, boolean direct, Fields fields) { - if(_fields.containsKey(streamId)) { + if (_fields.containsKey(streamId)) { throw new IllegalArgumentException("Fields for " + streamId + " already set"); } _fields.put(streamId, new StreamInfo(fields.toList(), direct)); } - public Map<String, StreamInfo> getFieldsDeclaration() { return _fields; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java index 4e4ebe4..c90a545 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java @@ -21,11 +21,11 @@ public class ReportedFailedException extends FailedException { public ReportedFailedException() { super(); } - + public ReportedFailedException(String msg) { super(msg); } - + public ReportedFailedException(String msg, Throwable cause) { super(msg, cause); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java index c0d8254..9c5ec34 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java @@ -18,5 +18,5 @@ package backtype.storm.topology; public interface SpoutDeclarer extends ComponentConfigurationDeclarer<SpoutDeclarer> { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java index c04e449..2b546e3 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java @@ -18,108 +18,90 @@ package backtype.storm.topology; import backtype.storm.Config; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.NullStruct; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; +import org.json.simple.JSONValue; + import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import org.json.simple.JSONValue; /** - * TopologyBuilder exposes the Java API for specifying a topology for Storm - * to execute. Topologies are Thrift structures in the end, but since the Thrift API - * is so verbose, TopologyBuilder greatly eases the process of creating topologies. - * The template for creating and submitting a topology looks something like: - * + * TopologyBuilder exposes the Java API for specifying a topology for Storm to execute. Topologies are Thrift structures in the end, but since the Thrift API is + * so verbose, TopologyBuilder greatly eases the process of creating topologies. The template for creating and submitting a topology looks something like: + * * <pre> * TopologyBuilder builder = new TopologyBuilder(); - * - * builder.setSpout("1", new TestWordSpout(true), 5); - * builder.setSpout("2", new TestWordSpout(true), 3); - * builder.setBolt("3", new TestWordCounter(), 3) - * .fieldsGrouping("1", new Fields("word")) - * .fieldsGrouping("2", new Fields("word")); - * builder.setBolt("4", new TestGlobalCount()) - * .globalGrouping("1"); - * + * + * builder.setSpout("1", new TestWordSpout(true), 5); + * builder.setSpout("2", new TestWordSpout(true), 3); + * builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word")); + * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1"); + * * Map conf = new HashMap(); * conf.put(Config.TOPOLOGY_WORKERS, 4); * - * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology()); + * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology()); * </pre> - * - * Running the exact same topology in local mode (in process), and configuring it to log all tuples - * emitted, looks like the following. Note that it lets the topology run for 10 seconds - * before shutting down the local cluster. - * + * + * Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the + * topology run for 10 seconds before shutting down the local cluster. + * * <pre> * TopologyBuilder builder = new TopologyBuilder(); - * - * builder.setSpout("1", new TestWordSpout(true), 5); - * builder.setSpout("2", new TestWordSpout(true), 3); - * builder.setBolt("3", new TestWordCounter(), 3) - * .fieldsGrouping("1", new Fields("word")) - * .fieldsGrouping("2", new Fields("word")); - * builder.setBolt("4", new TestGlobalCount()) - * .globalGrouping("1"); - * + * + * builder.setSpout("1", new TestWordSpout(true), 5); + * builder.setSpout("2", new TestWordSpout(true), 3); + * builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word")); + * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1"); + * * Map conf = new HashMap(); * conf.put(Config.TOPOLOGY_WORKERS, 4); * conf.put(Config.TOPOLOGY_DEBUG, true); - * + * * LocalCluster cluster = new LocalCluster(); - * cluster.submitTopology("mytopology", conf, builder.createTopology()); + * cluster.submitTopology("mytopology", conf, builder.createTopology()); * Utils.sleep(10000); * cluster.shutdown(); * </pre> - * - * <p>The pattern for TopologyBuilder is to map component ids to components using the setSpout - * and setBolt methods. Those methods return objects that are then used to declare - * the inputs for that component.</p> + * + * <p> + * The pattern for TopologyBuilder is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used + * to declare the inputs for that component. + * </p> */ public class TopologyBuilder { private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>(); -// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>(); + // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>(); private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>(); - - + public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>(); - for(String boltId: _bolts.keySet()) { + for (String boltId : _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); ComponentCommon common = getComponentCommon(boltId, bolt); boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } - for(String spoutId: _spouts.keySet()) { + for (String spoutId : _spouts.keySet()) { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = getComponentCommon(spoutId, spout); spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); - + } - return new StormTopology(spoutSpecs, - boltSpecs, - new HashMap<String, StateSpoutSpec>()); + return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>()); } /** * Define a new bolt in this topology with parallelism of just one thread. - * + * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the bolt * @return use the returned object to declare the inputs to this component @@ -130,10 +112,11 @@ public class TopologyBuilder { /** * Define a new bolt in this topology with the specified amount of parallelism. - * + * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the bolt - * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster. + * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around + * the cluster. * @return use the returned object to declare the inputs to this component */ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) { @@ -144,11 +127,9 @@ public class TopologyBuilder { } /** - * Define a new bolt in this topology. This defines a basic bolt, which is a - * simpler to use but more restricted kind of bolt. Basic bolts are intended - * for non-aggregation processing and automate the anchoring/acking process to - * achieve proper reliability in the topology. - * + * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for + * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology. + * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the basic bolt * @return use the returned object to declare the inputs to this component @@ -158,14 +139,13 @@ public class TopologyBuilder { } /** - * Define a new bolt in this topology. This defines a basic bolt, which is a - * simpler to use but more restricted kind of bolt. Basic bolts are intended - * for non-aggregation processing and automate the anchoring/acking process to - * achieve proper reliability in the topology. - * + * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for + * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology. + * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the basic bolt - * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster. + * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around + * the cluster. * @return use the returned object to declare the inputs to this component */ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) { @@ -174,7 +154,7 @@ public class TopologyBuilder { /** * Define a new spout in this topology. - * + * * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. * @param spout the spout */ @@ -183,12 +163,12 @@ public class TopologyBuilder { } /** - * Define a new spout in this topology with the specified parallelism. If the spout declares - * itself as non-distributed, the parallelism_hint will be ignored and only one task - * will be allocated to this component. - * + * Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored + * and only one task will be allocated to this component. + * * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. - * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster. + * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around + * the cluster. * @param spout the spout */ public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) { @@ -207,51 +187,51 @@ public class TopologyBuilder { // TODO: finish } - private void validateUnusedId(String id) { - if(_bolts.containsKey(id)) { + if (_bolts.containsKey(id)) { throw new IllegalArgumentException("Bolt has already been declared for id " + id); } - if(_spouts.containsKey(id)) { + if (_spouts.containsKey(id)) { throw new IllegalArgumentException("Spout has already been declared for id " + id); } - if(_stateSpouts.containsKey(id)) { + if (_stateSpouts.containsKey(id)) { throw new IllegalArgumentException("State spout has already been declared for id " + id); } } private ComponentCommon getComponentCommon(String id, IComponent component) { ComponentCommon ret = new ComponentCommon(_commons.get(id)); - + OutputFieldsGetter getter = new OutputFieldsGetter(); component.declareOutputFields(getter); ret.set_streams(getter.getFieldsDeclaration()); - return ret; + return ret; } - + private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); - if(parallelism!=null) { + if (parallelism != null) { common.set_parallelism_hint(parallelism.intValue()); - }else { + } else { common.set_parallelism_hint(1); } Map conf = component.getComponentConfiguration(); - if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf)); + if (conf != null) + common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); } protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> { String _id; - + public ConfigGetter(String id) { _id = id; } - + @Override public T addConfigurations(Map conf) { - if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { + if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { throw new IllegalArgumentException("Cannot set serializations for a component using fluent API"); } String currConf = _commons.get(_id).get_json_conf(); @@ -259,13 +239,13 @@ public class TopologyBuilder { return (T) this; } } - + protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer { public SpoutGetter(String id) { super(id); - } + } } - + protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer { private String _boltId; @@ -305,17 +285,17 @@ public class TopologyBuilder { public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct())); } - + @Override public BoltDeclarer localFirstGrouping(String componentId) { return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID); } - + @Override public BoltDeclarer localFirstGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.localFirst(new NullStruct())); } - + public BoltDeclarer noneGrouping(String componentId) { return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID); } @@ -368,17 +348,20 @@ public class TopologyBuilder { @Override public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) { return grouping(id.get_componentId(), id.get_streamId(), grouping); - } + } } - + private static Map parseJson(String json) { - if(json==null) return new HashMap(); - else return (Map) JSONValue.parse(json); + if (json == null) + return new HashMap(); + else + return (Map) JSONValue.parse(json); } - + private static String mergeIntoJson(Map into, Map newMap) { Map res = new HashMap(into); - if(newMap!=null) res.putAll(newMap); + if (newMap != null) + res.putAll(newMap); return JSONValue.toJSONString(res); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java index e585ee6..eb13e56 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java @@ -29,5 +29,5 @@ public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt @Override public void cleanup() { - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java index 3206941..43d21a3 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java @@ -21,5 +21,5 @@ import backtype.storm.coordination.IBatchBolt; import java.util.Map; public abstract class BaseBatchBolt<T> extends BaseComponent implements IBatchBolt<T> { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java index 8afcdaa..1206abc 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java @@ -24,5 +24,5 @@ public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java index 2d20a48..64c3887 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java @@ -19,7 +19,6 @@ package backtype.storm.topology.base; import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout; - public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends BaseComponent implements IOpaquePartitionedTransactionalSpout<T> { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java index 266736e..ebf31eb 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java @@ -22,5 +22,5 @@ import backtype.storm.topology.IRichBolt; public abstract class BaseRichBolt extends BaseComponent implements IRichBolt { @Override public void cleanup() { - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java index 37513b7..18f1f2c 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java @@ -24,7 +24,7 @@ package backtype.storm.topology.base; import backtype.storm.topology.IRichSpout; /** - * + * * @author nathan */ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java index b6451e9..246b3a3 100755 --- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java @@ -20,5 +20,5 @@ package backtype.storm.topology.base; import backtype.storm.transactional.TransactionAttempt; public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java index 859bad2..0e91178 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java @@ -18,9 +18,8 @@ package backtype.storm.transactional; /** - * This marks an IBatchBolt within a transactional topology as a committer. This causes the - * finishBatch method to be called in order of the transactions. + * This marks an IBatchBolt within a transactional topology as a committer. This causes the finishBatch method to be called in order of the transactions. */ public interface ICommitter { - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java index 5441ee2..1cd448c 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java @@ -20,12 +20,11 @@ package backtype.storm.transactional; import backtype.storm.task.TopologyContext; import java.util.Map; - public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> { public interface Emitter extends ITransactionalSpout.Emitter { void commit(TransactionAttempt attempt); - } - + } + @Override - public Emitter getEmitter(Map conf, TopologyContext context); + public Emitter getEmitter(Map conf, TopologyContext context); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java index 3207493..528eda7 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java @@ -26,69 +26,62 @@ import java.util.Map; public interface ITransactionalSpout<T> extends IComponent { public interface Coordinator<X> { /** - * Create metadata for this particular transaction id which has never - * been emitted before. The metadata should contain whatever is necessary - * to be able to replay the exact batch for the transaction at a later point. + * Create metadata for this particular transaction id which has never been emitted before. The metadata should contain whatever is necessary to be able + * to replay the exact batch for the transaction at a later point. * * The metadata is stored in Zookeeper. * - * Storm uses the Kryo serializations configured in the component configuration - * for this spout to serialize and deserialize the metadata. + * Storm uses the Kryo serializations configured in the component configuration for this spout to serialize and deserialize the metadata. * * @param txid The id of the transaction. * @param prevMetadata The metadata of the previous transaction * @return the metadata for this new transaction */ X initializeTransaction(BigInteger txid, X prevMetadata); - + /** * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction). * - * You should sleep here if you want a delay between asking for the next transaction (this will be called - * repeatedly in a loop). + * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop). */ boolean isReady(); - + /** * Release any resources from this coordinator. */ void close(); } - + public interface Emitter<X> { /** - * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata - * was created by the Coordinator in the initializeTranaction method. This method must always emit - * the same batch of tuples across all tasks for the same transaction id. + * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata was created by the Coordinator in the + * initializeTranaction method. This method must always emit the same batch of tuples across all tasks for the same transaction id. * * The first field of all emitted tuples must contain the provided TransactionAttempt. * */ void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector); - + /** - * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this - * method should clean up that state. + * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this method should clean up that state. */ void cleanupBefore(BigInteger txid); - + /** * Release any resources held by this emitter. */ void close(); } - + /** - * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches - * of tuples should be emitted and when transactions should commit. The Coordinator that you provide - * in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed. + * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches of tuples should be emitted and when transactions should + * commit. The Coordinator that you provide in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed. */ Coordinator<T> getCoordinator(Map conf, TopologyContext context); /** - * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for - * emitting batches of tuples for a transaction and must ensure that the same batch of tuples is always - * emitted for the same transaction id. - */ + * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for emitting batches of tuples for a transaction and + * must ensure that the same batch of tuples is always emitted for the same transaction id. + */ Emitter<T> getEmitter(Map conf, TopologyContext context); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java index 80bbb0e..e64a2d7 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java @@ -22,22 +22,21 @@ import java.math.BigInteger; public class TransactionAttempt { BigInteger _txid; long _attemptId; - - + // for kryo compatibility public TransactionAttempt() { - + } - + public TransactionAttempt(BigInteger txid, long attemptId) { _txid = txid; _attemptId = attemptId; } - + public BigInteger getTransactionId() { return _txid; } - + public long getAttemptId() { return _attemptId; } @@ -49,7 +48,8 @@ public class TransactionAttempt { @Override public boolean equals(Object o) { - if(!(o instanceof TransactionAttempt)) return false; + if (!(o instanceof TransactionAttempt)) + return false; TransactionAttempt other = (TransactionAttempt) o; return _txid.equals(other._txid) && _attemptId == other._attemptId; } @@ -57,5 +57,5 @@ public class TransactionAttempt { @Override public String toString() { return "" + _txid + ":" + _attemptId; - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java index 53aacae..9bcd75d 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java @@ -31,18 +31,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TransactionalSpoutBatchExecutor implements IRichBolt { - public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class); + public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class); BatchOutputCollectorImpl _collector; ITransactionalSpout _spout; ITransactionalSpout.Emitter _emitter; - + TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<BigInteger, TransactionAttempt>(); public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) { _spout = spout; } - + @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = new BatchOutputCollectorImpl(collector); @@ -53,27 +53,27 @@ public class TransactionalSpoutBatchExecutor implements IRichBolt { public void execute(Tuple input) { TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); try { - if(input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) { - if(attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) { + if (input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) { + if (attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) { ((ICommitterTransactionalSpout.Emitter) _emitter).commit(attempt); _activeTransactions.remove(attempt.getTransactionId()); _collector.ack(input); } else { _collector.fail(input); } - } else { + } else { _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeTransactions.put(attempt.getTransactionId(), attempt); _collector.ack(input); BigInteger committed = (BigInteger) input.getValue(2); - if(committed!=null) { - // valid to delete before what's been committed since + if (committed != null) { + // valid to delete before what's been committed since // those batches will never be accessed again _activeTransactions.headMap(committed).clear(); _emitter.cleanupBefore(committed); } } - } catch(FailedException e) { + } catch (FailedException e) { LOG.warn("Failed to emit batch for transaction", e); _collector.fail(input); }
