http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java index 8124651..e8390f6 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java +++ b/jstorm-core/src/main/java/backtype/storm/task/TopologyContext.java @@ -21,86 +21,76 @@ import backtype.storm.generated.GlobalStreamId; import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.hooks.ITaskHook; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.metric.api.CombinedMetric; +import backtype.storm.metric.api.*; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.alibaba.jstorm.cluster.StormClusterState; import org.apache.commons.lang.NotImplementedException; import org.json.simple.JSONValue; +import java.util.*; + /** - * A TopologyContext is given to bolts and spouts in their "prepare" and "open" - * methods, respectively. This object provides information about the component's + * A TopologyContext is given to bolts and spouts in their "prepare" and "open" methods, respectively. This object provides information about the component's * place within the topology, such as task ids, inputs and outputs, etc. - * - * <p>The TopologyContext is also used to declare ISubscribedState objects to - * synchronize state with StateSpouts this object is subscribed to.</p> + * + * <p> + * The TopologyContext is also used to declare ISubscribedState objects to synchronize state with StateSpouts this object is subscribed to. + * </p> */ public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private Integer _taskId; private Map<String, Object> _taskData = new HashMap<String, Object>(); private List<ITaskHook> _hooks = new ArrayList<ITaskHook>(); private Map<String, Object> _executorData; - private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics; + private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics; private clojure.lang.Atom _openOrPrepareWasCalled; - - - public TopologyContext(StormTopology topology, Map stormConf, - Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, - Map<String, Map<String, Fields>> componentToStreamToFields, - String stormId, String codeDir, String pidDir, Integer taskId, - Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, - Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics, - clojure.lang.Atom openOrPrepareWasCalled) { - super(topology, stormConf, taskToComponent, componentToSortedTasks, - componentToStreamToFields, stormId, codeDir, pidDir, - workerPort, workerTasks, defaultResources, userResources); + private StormClusterState _zkCluster; + + public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, + Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, + List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, + Map registeredMetrics, clojure.lang.Atom openOrPrepareWasCalled, StormClusterState zkCluster) { + super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, + defaultResources, userResources); _taskId = taskId; _executorData = executorData; _registeredMetrics = registeredMetrics; _openOrPrepareWasCalled = openOrPrepareWasCalled; + _zkCluster = zkCluster; } /** - * All state from all subscribed state spouts streams will be synced with - * the provided object. - * - * <p>It is recommended that your ISubscribedState object is kept as an instance - * variable of this object. The recommended usage of this method is as follows:</p> - * + * All state from all subscribed state spouts streams will be synced with the provided object. + * + * <p> + * It is recommended that your ISubscribedState object is kept as an instance variable of this object. The recommended usage of this method is as follows: + * </p> + * * <p> * _myState = context.setAllSubscribedState(new MyState()); * </p> + * * @param obj Provided ISubscribedState implementation * @return Returns the ISubscribedState object provided */ public <T extends ISubscribedState> T setAllSubscribedState(T obj) { - //check that only subscribed to one component/stream for statespout - //setsubscribedstate appropriately + // check that only subscribed to one component/stream for statespout + // setsubscribedstate appropriately throw new NotImplementedException(); } - /** - * Synchronizes the default stream from the specified state spout component - * id with the provided ISubscribedState object. - * - * <p>The recommended usage of this method is as follows:</p> + * Synchronizes the default stream from the specified state spout component id with the provided ISubscribedState object. + * + * <p> + * The recommended usage of this method is as follows: + * </p> * <p> * _myState = context.setSubscribedState(componentId, new MyState()); * </p> - * + * * @param componentId the id of the StateSpout component to subscribe to * @param obj Provided ISubscribedState implementation * @return Returns the ISubscribedState object provided @@ -110,14 +100,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } /** - * Synchronizes the specified stream from the specified state spout component - * id with the provided ISubscribedState object. - * - * <p>The recommended usage of this method is as follows:</p> + * Synchronizes the specified stream from the specified state spout component id with the provided ISubscribedState object. + * + * <p> + * The recommended usage of this method is as follows: + * </p> * <p> * _myState = context.setSubscribedState(componentId, streamId, new MyState()); * </p> - * + * * @param componentId the id of the StateSpout component to subscribe to * @param streamId the stream to subscribe to * @param obj Provided ISubscribedState implementation @@ -129,7 +120,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo /** * Gets the task id of this task. - * + * * @return the task id */ public int getThisTaskId() { @@ -137,33 +128,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } /** - * Gets the component id for this task. The component id maps - * to a component id specified for a Spout or Bolt in the topology definition. + * Gets the component id for this task. The component id maps to a component id specified for a Spout or Bolt in the topology definition. + * * @return */ public String getThisComponentId() { return getComponentId(_taskId); } - /** - * Gets the declared output fields for the specified stream id for the - * component this task is a part of. - */ - public Fields getThisOutputFields(String streamId) { - return getComponentOutputFields(getThisComponentId(), streamId); - } - - /** - * Gets the declared output fields for the specified stream id for the - * component this task is a part of. - */ - public Map<String, List<String>> getThisOutputFieldsForStreams() { - Map<String, List<String>> streamToFields = new HashMap<String, List<String>>(); - for (String stream : this.getThisStreams()) { - streamToFields.put(stream, this.getThisOutputFields(stream).toList()); - } - return streamToFields; - } + /** + * Gets the declared output fields for the specified stream id for the component this task is a part of. + */ + public Fields getThisOutputFields(String streamId) { + return getComponentOutputFields(getThisComponentId(), streamId); + } + + /** + * Gets the declared output fields for the specified stream id for the component this task is a part of. + */ + public Map<String, List<String>> getThisOutputFieldsForStreams() { + Map<String, List<String>> streamToFields = new HashMap<String, List<String>>(); + for (String stream : this.getThisStreams()) { + streamToFields.put(stream, this.getThisOutputFields(stream).toList()); + } + return streamToFields; + } /** * Gets the set of streams declared for the component of this task. @@ -173,15 +162,14 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } /** - * Gets the index of this task id in getComponentTasks(getThisComponentId()). - * An example use case for this method is determining which task - * accesses which resource in a distributed resource to ensure an even distribution. + * Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which task accesses which + * resource in a distributed resource to ensure an even distribution. */ public int getThisTaskIndex() { List<Integer> tasks = new ArrayList<Integer>(getComponentTasks(getThisComponentId())); Collections.sort(tasks); - for(int i=0; i<tasks.size(); i++) { - if(tasks.get(i) == getThisTaskId()) { + for (int i = 0; i < tasks.size(); i++) { + if (tasks.get(i) == getThisTaskId()) { return i; } } @@ -190,7 +178,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo /** * Gets the declared inputs to this component. - * + * * @return A map from subscribed component/stream to the grouping subscribed with. */ public Map<GlobalStreamId, Grouping> getThisSources() { @@ -199,7 +187,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo /** * Gets information about who is consuming the outputs of this component, and how. - * + * * @return Map from stream id to component id to the Grouping used. */ public Map<String, Map<String, Grouping>> getThisTargets() { @@ -231,15 +219,15 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo return _hooks; } - private static Map<String, Object> groupingToJSONableMap(Grouping grouping) { - Map groupingMap = new HashMap<String, Object>(); - groupingMap.put("type", grouping.getSetField().toString()); - if (grouping.is_set_fields()) { - groupingMap.put("fields", grouping.get_fields()); - } - return groupingMap; - } - + private static Map<String, Object> groupingToJSONableMap(Grouping grouping) { + Map groupingMap = new HashMap<String, Object>(); + groupingMap.put("type", grouping.getSetField().toString()); + if (grouping.is_set_fields()) { + groupingMap.put("fields", grouping.get_fields()); + } + return groupingMap; + } + @Override public String toJSONString() { Map obj = new HashMap(); @@ -253,39 +241,38 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo // Convert targets to a JSON serializable format Map<String, Map> stringTargets = new HashMap<String, Map>(); for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) { - Map stringTargetMap = new HashMap<String, Object>(); - for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) { - stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue())); - } - stringTargets.put(entry.getKey(), stringTargetMap); + Map stringTargetMap = new HashMap<String, Object>(); + for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) { + stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue())); + } + stringTargets.put(entry.getKey(), stringTargetMap); } obj.put("stream->target->grouping", stringTargets); // Convert sources to a JSON serializable format Map<String, Map<String, Object>> stringSources = new HashMap<String, Map<String, Object>>(); for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) { - GlobalStreamId gid = entry.getKey(); - Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId()); - if (stringSourceMap == null) { - stringSourceMap = new HashMap<String, Object>(); - stringSources.put(gid.get_componentId(), stringSourceMap); - } - stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue())); + GlobalStreamId gid = entry.getKey(); + Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId()); + if (stringSourceMap == null) { + stringSourceMap = new HashMap<String, Object>(); + stringSources.put(gid.get_componentId(), stringSourceMap); + } + stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue())); } obj.put("source->stream->grouping", stringSources); return JSONValue.toJSONString(obj); } /* - * Register a IMetric instance. - * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs - * and the returned value is sent to all metrics consumers. - * You must call this during IBolt::prepare or ISpout::open. + * Register a IMetric instance. Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs and the returned value is sent to all metrics + * consumers. You must call this during IBolt::prepare or ISpout::open. + * * @return The IMetric argument unchanged. */ public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) { - if((Boolean)_openOrPrepareWasCalled.deref() == true) { - throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + - "IBolt::prepare() or ISpout::open() method."); + if ((Boolean) _openOrPrepareWasCalled.deref() == true) { + throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + + "IBolt::prepare() or ISpout::open() method."); } if (metric == null) { @@ -293,27 +280,27 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } if (timeBucketSizeInSecs <= 0) { - throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " + - "greater than or equal to 1 second."); + throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " + + "greater than or equal to 1 second."); } if (getRegisteredMetricByName(name) != null) { - throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } Map m1 = _registeredMetrics; - if(!m1.containsKey(timeBucketSizeInSecs)) { + if (!m1.containsKey(timeBucketSizeInSecs)) { m1.put(timeBucketSizeInSecs, new HashMap()); } - Map m2 = (Map)m1.get(timeBucketSizeInSecs); - if(!m2.containsKey(_taskId)) { + Map m2 = (Map) m1.get(timeBucketSizeInSecs); + if (!m2.containsKey(_taskId)) { m2.put(_taskId, new HashMap()); } - Map m3 = (Map)m2.get(_taskId); - if(m3.containsKey(name)) { - throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + Map m3 = (Map) m2.get(_taskId); + if (m3.containsKey(name)) { + throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } else { m3.put(name, metric); } @@ -322,21 +309,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } /** - * Get component's metric from registered metrics by name. - * Notice: Normally, one component can only register one metric name once. - * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) - * cause the same metric name can register twice. - * So we just return the first metric we meet. + * Get component's metric from registered metrics by name. Notice: Normally, one component can only register one metric name once. But now registerMetric + * has a bug(https://issues.apache.org/jira/browse/STORM-254) cause the same metric name can register twice. So we just return the first metric we meet. */ public IMetric getRegisteredMetricByName(String name) { IMetric metric = null; - for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) { + for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric : _registeredMetrics.values()) { Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId); if (nameToMetric != null) { metric = nameToMetric.get(name); if (metric != null) { - //we just return the first metric we meet + // we just return the first metric we meet break; } } @@ -351,10 +335,21 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } + /* * Convinience method for registering CombinedMetric. */ public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + + public StormClusterState getZkCluster() { + return _zkCluster; + } + /* + * Task error report callback + * */ + public void reportError(String errorMsg) throws Exception{ + _zkCluster.report_task_error(getTopologyId(), _taskId, errorMsg, null); + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java index de407ac..09c8c8c 100755 --- a/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java +++ b/jstorm-core/src/main/java/backtype/storm/task/WorkerTopologyContext.java @@ -27,34 +27,23 @@ import java.util.concurrent.ExecutorService; public class WorkerTopologyContext extends GeneralTopologyContext { public static final String SHARED_EXECUTOR = "executor"; - + private Integer _workerPort; private List<Integer> _workerTasks; private String _codeDir; private String _pidDir; Map<String, Object> _userResources; Map<String, Object> _defaultResources; - - public WorkerTopologyContext( - StormTopology topology, - Map stormConf, - Map<Integer, String> taskToComponent, - Map<String, List<Integer>> componentToSortedTasks, - Map<String, Map<String, Fields>> componentToStreamToFields, - String stormId, - String codeDir, - String pidDir, - Integer workerPort, - List<Integer> workerTasks, - Map<String, Object> defaultResources, - Map<String, Object> userResources - ) { + + public WorkerTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, + Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, + String pidDir, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId); _codeDir = codeDir; _defaultResources = defaultResources; _userResources = userResources; try { - if(pidDir!=null) { + if (pidDir != null) { _pidDir = new File(pidDir).getCanonicalPath(); } else { _pidDir = null; @@ -67,13 +56,12 @@ public class WorkerTopologyContext extends GeneralTopologyContext { } /** - * Gets all the task ids that are running in this worker process - * (including the task for this task). + * Gets all the task ids that are running in this worker process (including the task for this task). */ public List<Integer> getThisWorkerTasks() { return _workerTasks; } - + public Integer getThisWorkerPort() { return _workerPort; } @@ -81,28 +69,27 @@ public class WorkerTopologyContext extends GeneralTopologyContext { public void setThisWorkerTasks(List<Integer> workerTasks) { this._workerTasks = workerTasks; } + /** - * Gets the location of the external resources for this worker on the - * local filesystem. These external resources typically include bolts implemented - * in other languages, such as Ruby or Python. + * Gets the location of the external resources for this worker on the local filesystem. These external resources typically include bolts implemented in + * other languages, such as Ruby or Python. */ public String getCodeDir() { return _codeDir; } /** - * If this task spawns any subprocesses, those subprocesses must immediately - * write their PID to this directory on the local filesystem to ensure that - * Storm properly destroys that process when the worker is shutdown. + * If this task spawns any subprocesses, those subprocesses must immediately write their PID to this directory on the local filesystem to ensure that Storm + * properly destroys that process when the worker is shutdown. */ public String getPIDDir() { return _pidDir; } - + public Object getResource(String name) { return _userResources.get(name); } - + public ExecutorService getSharedExecutor() { return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java index d65c8bd..fbbcbfc 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailDelegate.java @@ -21,5 +21,6 @@ import java.io.Serializable; public interface AckFailDelegate extends Serializable { public void ack(Object id); + public void fail(Object id); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java index e16afd8..f3feff3 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/AckFailMapTracker.java @@ -22,31 +22,31 @@ import java.util.HashSet; import java.util.Set; public class AckFailMapTracker implements AckFailDelegate { - + String _acked; String _failed; - + public AckFailMapTracker() { _acked = RegisteredGlobalState.registerState(new HashSet()); _failed = RegisteredGlobalState.registerState(new HashSet()); } - + public boolean isAcked(Object id) { - return ((Set)RegisteredGlobalState.getState(_acked)).contains(id); + return ((Set) RegisteredGlobalState.getState(_acked)).contains(id); } - + public boolean isFailed(Object id) { - return ((Set)RegisteredGlobalState.getState(_failed)).contains(id); + return ((Set) RegisteredGlobalState.getState(_failed)).contains(id); } @Override public void ack(Object id) { - ((Set)RegisteredGlobalState.getState(_acked)).add(id); + ((Set) RegisteredGlobalState.getState(_acked)).add(id); } @Override public void fail(Object id) { - ((Set)RegisteredGlobalState.getState(_failed)).add(id); + ((Set) RegisteredGlobalState.getState(_failed)).add(id); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java index ad80475..10973f1 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/AckTracker.java @@ -24,14 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger; public class AckTracker implements AckFailDelegate { private static Map<String, AtomicInteger> acks = new ConcurrentHashMap<String, AtomicInteger>(); - + private String _id; - + public AckTracker() { _id = UUID.randomUUID().toString(); acks.put(_id, new AtomicInteger(0)); } - + @Override public void ack(Object id) { acks.get(_id).incrementAndGet(); @@ -40,13 +40,13 @@ public class AckTracker implements AckFailDelegate { @Override public void fail(Object id) { } - + public int getNumAcks() { return acks.get(_id).intValue(); } - + public void resetNumAcks() { acks.get(_id).set(0); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java index 26f964a..2565f25 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchNumberList.java @@ -37,16 +37,15 @@ public class BatchNumberList extends BaseBatchBolt { } String _wordComponent; - + public BatchNumberList(String wordComponent) { _wordComponent = wordComponent; } - + String word = null; List<Integer> intSet = new ArrayList<Integer>(); BatchOutputCollector _collector; - - + @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; @@ -54,7 +53,7 @@ public class BatchNumberList extends BaseBatchBolt { @Override public void execute(Tuple tuple) { - if(tuple.getSourceComponent().equals(_wordComponent)) { + if (tuple.getSourceComponent().equals(_wordComponent)) { this.word = tuple.getString(1); } else { intSet.add(tuple.getInteger(1)); @@ -63,10 +62,10 @@ public class BatchNumberList extends BaseBatchBolt { @Override public void finishBatch() { - if(word!=null) { + if (word != null) { Collections.sort(intSet); _collector.emit(new Values(word, intSet)); } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java index 7f3eaf1..819c7c1 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchProcessWord.java @@ -35,5 +35,5 @@ public class BatchProcessWord extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { collector.emit(new Values(input.getValue(0), input.getString(1).length())); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java index 107f2ed..f5751d1 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/BatchRepeatA.java @@ -24,21 +24,20 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +public class BatchRepeatA extends BaseBasicBolt { -public class BatchRepeatA extends BaseBasicBolt { - @Override public void execute(Tuple input, BasicOutputCollector collector) { - Object id = input.getValue(0); - String word = input.getString(1); - for(int i=0; i<word.length(); i++) { - if(word.charAt(i) == 'a') { + Object id = input.getValue(0); + String word = input.getString(1); + for (int i = 0; i < word.length(); i++) { + if (word.charAt(i) == 'a') { collector.emit("multi", new Values(id, word.substring(0, i))); } } collector.emit("single", new Values(id, word)); } - + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("multi", new Fields("id", "word")); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java index 3fe4e7a..0319ebc 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/BoltTracker.java @@ -22,7 +22,6 @@ import backtype.storm.topology.OutputFieldsDeclarer; import java.util.HashMap; import java.util.Map; - public class BoltTracker extends NonRichBoltTracker implements IRichBolt { IRichBolt _richDelegate; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java index f3306cf..8483413 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/CompleteTopologyParam.java @@ -23,65 +23,65 @@ import backtype.storm.Config; * The param class for the <code>Testing.completeTopology</code>. */ public class CompleteTopologyParam { - /** - * The mocked spout sources - */ - private MockedSources mockedSources; - /** - * the config for the topology when it was submitted to the cluster - */ - private Config stormConf; - /** - * whether cleanup the state? - */ - private Boolean cleanupState; - /** - * the topology name you want to submit to the cluster - */ - private String topologyName; + /** + * The mocked spout sources + */ + private MockedSources mockedSources; + /** + * the config for the topology when it was submitted to the cluster + */ + private Config stormConf; + /** + * whether cleanup the state? + */ + private Boolean cleanupState; + /** + * the topology name you want to submit to the cluster + */ + private String topologyName; - /** - * the timeout of topology you want to submit to the cluster - */ - private Integer timeoutMs; + /** + * the timeout of topology you want to submit to the cluster + */ + private Integer timeoutMs; - public MockedSources getMockedSources() { - return mockedSources; - } + public MockedSources getMockedSources() { + return mockedSources; + } - public void setMockedSources(MockedSources mockedSources) { - this.mockedSources = mockedSources; - } + public void setMockedSources(MockedSources mockedSources) { + this.mockedSources = mockedSources; + } - public Config getStormConf() { - return stormConf; - } + public Config getStormConf() { + return stormConf; + } - public void setStormConf(Config stormConf) { - this.stormConf = stormConf; - } + public void setStormConf(Config stormConf) { + this.stormConf = stormConf; + } - public Boolean getCleanupState() { - return cleanupState; - } + public Boolean getCleanupState() { + return cleanupState; + } - public void setCleanupState(Boolean cleanupState) { - this.cleanupState = cleanupState; - } + public void setCleanupState(Boolean cleanupState) { + this.cleanupState = cleanupState; + } - public String getTopologyName() { - return topologyName; - } + public String getTopologyName() { + return topologyName; + } - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } - public Integer getTimeoutMs() { - return timeoutMs; - } + public Integer getTimeoutMs() { + return timeoutMs; + } - public void setTimeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; - } + public void setTimeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java index 882801c..3682120 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingBatchBolt.java @@ -30,7 +30,7 @@ public class CountingBatchBolt extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; int _count = 0; - + @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; @@ -44,12 +44,12 @@ public class CountingBatchBolt extends BaseBatchBolt { @Override public void finishBatch() { - _collector.emit(new Values(_id, _count)); - } + _collector.emit(new Values(_id, _count)); + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "count")); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java index cb8f7e5..a45f16b 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/CountingCommitBolt.java @@ -32,7 +32,7 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit BatchOutputCollector _collector; TransactionAttempt _id; int _count = 0; - + @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { _id = id; @@ -46,8 +46,8 @@ public class CountingCommitBolt extends BaseTransactionalBolt implements ICommit @Override public void finishBatch() { - _collector.emit(new Values(_id, _count)); - } + _collector.emit(new Values(_id, _count)); + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java index 1ffb594..52ba5b7 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/FeederSpout.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.UUID; - public class FeederSpout extends BaseRichSpout { private int _id; private Fields _outFields; @@ -44,15 +43,15 @@ public class FeederSpout extends BaseRichSpout { public void setAckFailDelegate(AckFailDelegate d) { _ackFailDelegate = d; } - + public void feed(List<Object> tuple) { feed(tuple, UUID.randomUUID().toString()); } public void feed(List<Object> tuple, Object msgId) { InprocMessaging.sendMessage(_id, new Values(tuple, msgId)); - } - + } + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } @@ -63,10 +62,10 @@ public class FeederSpout extends BaseRichSpout { public void nextTuple() { List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id); - if(toEmit!=null) { + if (toEmit != null) { List<Object> tuple = (List<Object>) toEmit.get(0); Object msgId = toEmit.get(1); - + _collector.emit(tuple, msgId); } else { try { @@ -78,13 +77,13 @@ public class FeederSpout extends BaseRichSpout { } public void ack(Object msgId) { - if(_ackFailDelegate!=null) { + if (_ackFailDelegate != null) { _ackFailDelegate.ack(msgId); } } public void fail(Object msgId) { - if(_ackFailDelegate!=null) { + if (_ackFailDelegate != null) { _ackFailDelegate.fail(msgId); } } @@ -96,5 +95,5 @@ public class FeederSpout extends BaseRichSpout { @Override public Map<String, Object> getComponentConfiguration() { return new HashMap<String, Object>(); - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java index 9527803..01fc3e3 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/FixedTupleSpout.java @@ -35,17 +35,17 @@ public class FixedTupleSpout implements IRichSpout { private static final Map<String, Integer> failed = new HashMap<String, Integer>(); public static int getNumAcked(String stormId) { - synchronized(acked) { + synchronized (acked) { return get(acked, stormId, 0); } } public static int getNumFailed(String stormId) { - synchronized(failed) { + synchronized (failed) { return get(failed, stormId, 0); } } - + public static void clear(String stormId) { acked.remove(stormId); failed.remove(stormId); @@ -67,16 +67,16 @@ public class FixedTupleSpout implements IRichSpout { public FixedTupleSpout(List tuples, String fieldName) { _id = UUID.randomUUID().toString(); - synchronized(acked) { + synchronized (acked) { acked.put(_id, 0); } - synchronized(failed) { + synchronized (failed) { failed.put(_id, 0); } _tuples = new ArrayList<FixedTuple>(); - for(Object o: tuples) { + for (Object o : tuples) { FixedTuple ft; - if(o instanceof FixedTuple) { + if (o instanceof FixedTuple) { ft = (FixedTuple) o; } else { ft = new FixedTuple((List) o); @@ -89,25 +89,25 @@ public class FixedTupleSpout implements IRichSpout { public List<FixedTuple> getSourceTuples() { return _tuples; } - + public int getCompleted() { int ackedAmt; int failedAmt; - - synchronized(acked) { + + synchronized (acked) { ackedAmt = acked.get(_id); } - synchronized(failed) { + synchronized (failed) { failedAmt = failed.get(_id); } return ackedAmt + failedAmt; } - + public void cleanup() { - synchronized(acked) { + synchronized (acked) { acked.remove(_id); - } - synchronized(failed) { + } + synchronized (failed) { failed.remove(_id); } } @@ -116,15 +116,15 @@ public class FixedTupleSpout implements IRichSpout { _context = context; List<Integer> tasks = context.getComponentTasks(context.getThisComponentId()); int startIndex; - for(startIndex=0; startIndex<tasks.size(); startIndex++) { - if(tasks.get(startIndex)==context.getThisTaskId()) { + for (startIndex = 0; startIndex < tasks.size(); startIndex++) { + if (tasks.get(startIndex) == context.getThisTaskId()) { break; } } _collector = collector; _pending = new HashMap<String, FixedTuple>(); _serveTuples = new ArrayList<FixedTuple>(); - for(int i=startIndex; i<_tuples.size(); i+=tasks.size()) { + for (int i = startIndex; i < _tuples.size(); i += tasks.size()) { _serveTuples.add(_tuples.get(i)); } } @@ -133,7 +133,7 @@ public class FixedTupleSpout implements IRichSpout { } public void nextTuple() { - if(_serveTuples.size()>0) { + if (_serveTuples.size() > 0) { FixedTuple ft = _serveTuples.remove(0); String id = UUID.randomUUID().toString(); _pending.put(id, ft); @@ -144,16 +144,16 @@ public class FixedTupleSpout implements IRichSpout { } public void ack(Object msgId) { - synchronized(acked) { + synchronized (acked) { int curr = get(acked, _id, 0); - acked.put(_id, curr+1); + acked.put(_id, curr + 1); } } public void fail(Object msgId) { - synchronized(failed) { + synchronized (failed) { int curr = get(failed, _id, 0); - failed.put(_id, curr+1); + failed.put(_id, curr + 1); } } @@ -166,7 +166,7 @@ public class FixedTupleSpout implements IRichSpout { } @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { + public void declareOutputFields(OutputFieldsDeclarer declarer) { if (_fieldName != null) { declarer.declare(new Fields(_fieldName)); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java index 010336e..5cf8830 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/ForwardingMetricsConsumer.java @@ -51,45 +51,42 @@ public class ForwardingMetricsConsumer implements IMetricsConsumer { @Override public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { - String [] parts = ((String)registrationArgument).split(":",2); + String[] parts = ((String) registrationArgument).split(":", 2); host = parts[0]; port = Integer.valueOf(parts[1]); try { - socket = new Socket(host, port); - out = socket.getOutputStream(); + socket = new Socket(host, port); + out = socket.getOutputStream(); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } } @Override public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); - String header = taskInfo.timestamp + "\t" + - taskInfo.srcWorkerHost + ":"+ taskInfo.srcWorkerPort + "\t"+ - taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId + "\t"; + String header = + taskInfo.timestamp + "\t" + taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort + "\t" + taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId + + "\t"; sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); - sb.append(p.name) - .append("\t") - .append(p.value) - .append("\n"); + sb.append(p.name).append("\t").append(p.value).append("\n"); try { - out.write(sb.toString().getBytes()); - out.flush(); + out.write(sb.toString().getBytes()); + out.flush(); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } } } @Override - public void cleanup() { - try { - socket.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } + public void cleanup() { + try { + socket.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java index dcad640..b951d84 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/IdentityBolt.java @@ -25,7 +25,7 @@ import backtype.storm.tuple.Tuple; public class IdentityBolt extends BaseBasicBolt { Fields _fields; - + public IdentityBolt(Fields fields) { _fields = fields; } @@ -38,5 +38,5 @@ public class IdentityBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_fields); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java index 1c4d5b3..7e4b32f 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java @@ -32,7 +32,7 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Map<Object, Integer> _counts = new HashMap<Object, Integer>(); - + @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; @@ -48,14 +48,14 @@ public class KeyedCountingBatchBolt extends BaseBatchBolt { @Override public void finishBatch() { - for(Object key: _counts.keySet()) { + for (Object key : _counts.keySet()) { _collector.emit(new Values(_id, key, _counts.get(key))); } - } + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "key", "count")); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java index 887eb4e..67225cb 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java @@ -33,7 +33,7 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Map<Object, Number> _sums = new HashMap<Object, Number>(); - + @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; @@ -43,19 +43,19 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt { @Override public void execute(Tuple tuple) { Object key = tuple.getValue(1); - Number curr = Utils.get(_sums, key, 0); + Number curr = Utils.get(_sums, key, 0); _sums.put(key, Numbers.add(curr, tuple.getValue(2))); } @Override public void finishBatch() { - for(Object key: _sums.keySet()) { + for (Object key : _sums.keySet()) { _collector.emit(new Values(_id, key, _sums.get(key))); } - } + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "key", "sum")); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java index 3b492e1..75ad375 100644 --- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java @@ -35,13 +35,13 @@ import java.util.Map; public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> { public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id"; - + private String _id; private String _finishedPartitionsId; private int _takeAmt; private Fields _outFields; private Map<Integer, List<List<Object>>> _initialPartitions; - + public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) { _id = RegisteredGlobalState.registerState(partitions); Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>()); @@ -50,17 +50,17 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout< _outFields = outFields; _initialPartitions = partitions; } - + public boolean isExhaustedTuples() { Map<Integer, Boolean> statuses = getFinishedStatuses(); - for(Integer partition: getQueues().keySet()) { - if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) { + for (Integer partition : getQueues().keySet()) { + if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) { return false; } } return true; } - + class Coordinator implements IPartitionedTransactionalSpout.Coordinator { @Override @@ -71,29 +71,31 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout< @Override public boolean isReady() { return true; - } - + } + @Override public void close() { - } + } } - + class Emitter implements IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> { - + Integer _maxSpoutPending; Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>(); - + public Emitter(Map conf) { Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); - if(c==null) _maxSpoutPending = 1; - else _maxSpoutPending = Utils.getInt(c); + if (c == null) + _maxSpoutPending = 1; + else + _maxSpoutPending = Utils.getInt(c); } - - + @Override - public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) { + public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, + MemoryTransactionalSpoutMeta lastPartitionMeta) { int index; - if(lastPartitionMeta==null) { + if (lastPartitionMeta == null) { index = 0; } else { index = lastPartitionMeta.index + lastPartitionMeta.amt; @@ -102,40 +104,40 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout< int total = queue.size(); int left = total - index; int toTake = Math.min(left, _takeAmt); - + MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake); emitPartitionBatch(tx, collector, partition, ret); - if(toTake==0) { + if (toTake == 0) { // this is a pretty hacky way to determine when all the partitions have been committed // wait until we've emitted max-spout-pending empty partitions for the partition int curr = Utils.get(_emptyPartitions, partition, 0) + 1; _emptyPartitions.put(partition, curr); - if(curr > _maxSpoutPending) { + if (curr > _maxSpoutPending) { Map<Integer, Boolean> finishedStatuses = getFinishedStatuses(); // will be null in remote mode - if(finishedStatuses!=null) { + if (finishedStatuses != null) { finishedStatuses.put(partition, true); } } } - return ret; + return ret; } @Override public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta partitionMeta) { List<List<Object>> queue = getQueues().get(partition); - for(int i=partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) { + for (int i = partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) { List<Object> toEmit = new ArrayList<Object>(queue.get(i)); toEmit.add(0, tx); - collector.emit(toEmit); + collector.emit(toEmit); } } - + @Override public void close() { - } - } - + } + } + @Override public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { return new Coordinator(); @@ -159,22 +161,24 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout< conf.registerSerialization(MemoryTransactionalSpoutMeta.class); return conf; } - + public void startup() { getFinishedStatuses().clear(); } - + public void cleanup() { RegisteredGlobalState.clearState(_id); RegisteredGlobalState.clearState(_finishedPartitionsId); } - - private Map<Integer, List<List<Object>>> getQueues() { + + private Map<Integer, List<List<Object>>> getQueues() { Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id); - if(ret!=null) return ret; - else return _initialPartitions; + if (ret != null) + return ret; + else + return _initialPartitions; } - + private Map<Integer, Boolean> getFinishedStatuses() { return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java index 29681fb..a00788d 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java @@ -20,12 +20,12 @@ package backtype.storm.testing; public class MemoryTransactionalSpoutMeta { int index; int amt; - + // for kryo compatibility public MemoryTransactionalSpoutMeta() { - + } - + public MemoryTransactionalSpoutMeta(int index, int amt) { this.index = index; this.amt = amt; @@ -34,5 +34,5 @@ public class MemoryTransactionalSpoutMeta { @Override public String toString() { return "index: " + index + "; amt: " + amt; - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java index cd677c8..d325377 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/MkClusterParam.java @@ -23,35 +23,40 @@ import java.util.Map; * The param arg for <code>Testing.withSimulatedTimeCluster</code> and <code>Testing.withTrackedCluster</code> */ public class MkClusterParam { - /** - * count of supervisors for the cluster. - */ - private Integer supervisors; - /** - * count of port for each supervisor - */ - private Integer portsPerSupervisor; - /** - * cluster config - */ - private Map daemonConf; - - public Integer getSupervisors() { - return supervisors; - } - public void setSupervisors(Integer supervisors) { - this.supervisors = supervisors; - } - public Integer getPortsPerSupervisor() { - return portsPerSupervisor; - } - public void setPortsPerSupervisor(Integer portsPerSupervisor) { - this.portsPerSupervisor = portsPerSupervisor; - } - public Map getDaemonConf() { - return daemonConf; - } - public void setDaemonConf(Map daemonConf) { - this.daemonConf = daemonConf; - } + /** + * count of supervisors for the cluster. + */ + private Integer supervisors; + /** + * count of port for each supervisor + */ + private Integer portsPerSupervisor; + /** + * cluster config + */ + private Map daemonConf; + + public Integer getSupervisors() { + return supervisors; + } + + public void setSupervisors(Integer supervisors) { + this.supervisors = supervisors; + } + + public Integer getPortsPerSupervisor() { + return portsPerSupervisor; + } + + public void setPortsPerSupervisor(Integer portsPerSupervisor) { + this.portsPerSupervisor = portsPerSupervisor; + } + + public Map getDaemonConf() { + return daemonConf; + } + + public void setDaemonConf(Map daemonConf) { + this.daemonConf = daemonConf; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java index 34a8c68..a8a4bdf 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/MkTupleParam.java @@ -21,31 +21,34 @@ import java.util.ArrayList; import java.util.List; public class MkTupleParam { - private String stream; - private String component; - private List<String> fields; - - public String getStream() { - return stream; - } - public void setStream(String stream) { - this.stream = stream; - } - - public String getComponent() { - return component; - } - public void setComponent(String component) { - this.component = component; - } - - public List<String> getFields() { - return fields; - } - public void setFields(String... fields) { - this.fields = new ArrayList<String>(); - for (int i = 0; i < fields.length; i++) { - this.fields.add(fields[i]); - } - } + private String stream; + private String component; + private List<String> fields; + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getComponent() { + return component; + } + + public void setComponent(String component) { + this.component = component; + } + + public List<String> getFields() { + return fields; + } + + public void setFields(String... fields) { + this.fields = new ArrayList<String>(); + for (int i = 0; i < fields.length; i++) { + this.fields.add(fields[i]); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java index 1fd6b85..48b9ac0 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/MockedSources.java @@ -26,11 +26,11 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class MockedSources { - /** - * mocked spout sources for the [spout, stream] pair. - */ + /** + * mocked spout sources for the [spout, stream] pair. + */ private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>(); - + /** * add mock data for the spout. * @@ -42,18 +42,18 @@ public class MockedSources { if (!data.containsKey(spoutId)) { data.put(spoutId, new ArrayList<FixedTuple>()); } - + List<FixedTuple> tuples = data.get(spoutId); for (int i = 0; i < valueses.length; i++) { FixedTuple tuple = new FixedTuple(streamId, valueses[i]); tuples.add(tuple); } } - + public void addMockData(String spoutId, Values... valueses) { this.addMockData(spoutId, Utils.DEFAULT_STREAM_ID, valueses); } - + public Map<String, List<FixedTuple>> getData() { return this.data; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java index 785ed92..9e7363c 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/NGrouping.java @@ -27,17 +27,17 @@ import java.util.List; public class NGrouping implements CustomStreamGrouping { int _n; List<Integer> _outTasks; - + public NGrouping(int n) { _n = n; } - + @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { targetTasks = new ArrayList<Integer>(targetTasks); Collections.sort(targetTasks); _outTasks = new ArrayList<Integer>(); - for(int i=0; i<_n; i++) { + for (int i = 0; i < _n; i++) { _outTasks.add(targetTasks.get(i)); } } @@ -46,5 +46,5 @@ public class NGrouping implements CustomStreamGrouping { public List<Integer> chooseTasks(int taskId, List<Object> values) { return _outTasks; } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java index ccbb67f..b489289 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/NonRichBoltTracker.java @@ -25,7 +25,6 @@ import backtype.storm.utils.RegisteredGlobalState; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - public class NonRichBoltTracker implements IBolt { IBolt _delegate; String _trackId; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java index 1ff01b9..0c91d2b 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java @@ -37,34 +37,34 @@ import java.util.Map; */ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> { public static String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id"; - + private String _id; private String _finishedPartitionsId; private String _disabledId; private int _takeAmt; private Fields _outFields; - + public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) { _id = RegisteredGlobalState.registerState(partitions); - + Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>()); _finishedPartitionsId = RegisteredGlobalState.registerState(finished); - + Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>()); _disabledId = RegisteredGlobalState.registerState(disabled); - + _takeAmt = takeAmt; _outFields = outFields; } - + public void setDisabled(Integer partition, boolean disabled) { getDisabledStatuses().put(partition, disabled); } - + public boolean isExhaustedTuples() { Map<Integer, Boolean> statuses = getFinishedStatuses(); - for(Integer partition: getQueues().keySet()) { - if(!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) { + for (Integer partition : getQueues().keySet()) { + if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) { return false; } } @@ -80,7 +80,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { return new Coordinator(); } - + class Coordinator implements IOpaquePartitionedTransactionalSpout.Coordinator { @Override public boolean isReady() { @@ -91,24 +91,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac public void close() { } } - + class Emitter implements IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> { - + Integer _maxSpoutPending; Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>(); - + public Emitter(Map conf) { Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); - if(c==null) _maxSpoutPending = 1; - else _maxSpoutPending = Utils.getInt(c); + if (c == null) + _maxSpoutPending = 1; + else + _maxSpoutPending = Utils.getInt(c); } - - + @Override - public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) { - if(!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) { + public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, + MemoryTransactionalSpoutMeta lastPartitionMeta) { + if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) { int index; - if(lastPartitionMeta==null) { + if (lastPartitionMeta == null) { index = 0; } else { index = lastPartitionMeta.index + lastPartitionMeta.amt; @@ -119,26 +121,26 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac int toTake = Math.min(left, _takeAmt); MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake); - for(int i=ret.index; i < ret.index + ret.amt; i++) { + for (int i = ret.index; i < ret.index + ret.amt; i++) { List<Object> toEmit = new ArrayList<Object>(queue.get(i)); toEmit.add(0, tx); - collector.emit(toEmit); + collector.emit(toEmit); } - if(toTake==0) { + if (toTake == 0) { // this is a pretty hacky way to determine when all the partitions have been committed // wait until we've emitted max-spout-pending empty partitions for the partition int curr = Utils.get(_emptyPartitions, partition, 0) + 1; _emptyPartitions.put(partition, curr); - if(curr > _maxSpoutPending) { + if (curr > _maxSpoutPending) { getFinishedStatuses().put(partition, true); } } - return ret; + return ret; } else { return null; } } - + @Override public void close() { } @@ -147,7 +149,7 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac public int numPartitions() { return getQueues().size(); } - } + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { @@ -162,20 +164,20 @@ public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransac conf.registerSerialization(MemoryTransactionalSpoutMeta.class); return conf; } - + public void startup() { getFinishedStatuses().clear(); } - + public void cleanup() { RegisteredGlobalState.clearState(_id); RegisteredGlobalState.clearState(_finishedPartitionsId); } - + private Map<Integer, List<List<Object>>> getQueues() { return (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id); } - + private Map<Integer, Boolean> getFinishedStatuses() { return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java index 0bd9833..e9e2a9d 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/PrepareBatchBolt.java @@ -26,10 +26,9 @@ import backtype.storm.utils.Utils; import java.util.ArrayList; import java.util.List; - public class PrepareBatchBolt extends BaseBasicBolt { Fields _outFields; - + public PrepareBatchBolt(Fields outFields) { _outFields = outFields; } @@ -47,6 +46,5 @@ public class PrepareBatchBolt extends BaseBasicBolt { toEmit.addAll(input.getValues()); collector.emit(toEmit); } - - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java index 4b85ce8..fd41283 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsBolt.java @@ -27,23 +27,23 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt { - private static final long serialVersionUID = 1999209252187463355L; - - public PythonShellMetricsBolt(String[] command) { - super(command); - } - - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - super.prepare(stormConf, context, collector); - - CountShellMetric cMetric = new CountShellMetric(); - context.registerMetric("my-custom-shell-metric", cMetric, 5); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - public Map<String, Object> getComponentConfiguration() { - return null; - } + private static final long serialVersionUID = 1999209252187463355L; + + public PythonShellMetricsBolt(String[] command) { + super(command); + } + + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); + + CountShellMetric cMetric = new CountShellMetric(); + context.registerMetric("my-custom-shell-metric", cMetric, 5); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + public Map<String, Object> getComponentConfiguration() { + return null; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java index 3ccf935..8325fba 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/PythonShellMetricsSpout.java @@ -28,25 +28,25 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout { - private static final long serialVersionUID = 1999209252187463355L; - - public PythonShellMetricsSpout(String[] command) { - super(command); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - super.open(conf, context, collector); - - CountShellMetric cMetric = new CountShellMetric(); - context.registerMetric("my-custom-shellspout-metric", cMetric, 5); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("field1")); - } - - public Map<String, Object> getComponentConfiguration() { - return null; - } + private static final long serialVersionUID = 1999209252187463355L; + + public PythonShellMetricsSpout(String[] command) { + super(command); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + super.open(conf, context, collector); + + CountShellMetric cMetric = new CountShellMetric(); + context.registerMetric("my-custom-shellspout-metric", cMetric, 5); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("field1")); + } + + public Map<String, Object> getComponentConfiguration() { + return null; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java index 4d25ac7..cf9dc4d 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/SingleUserSimpleTransport.java @@ -23,15 +23,19 @@ import javax.security.auth.Subject; import java.security.Principal; import java.util.HashSet; - public class SingleUserSimpleTransport extends SimpleTransportPlugin { - @Override - protected Subject getDefaultSubject() { - HashSet<Principal> principals = new HashSet<Principal>(); - principals.add(new Principal() { - public String getName() { return "user"; } - public String toString() { return "user"; } - }); - return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>()); - } + @Override + protected Subject getDefaultSubject() { + HashSet<Principal> principals = new HashSet<Principal>(); + principals.add(new Principal() { + public String getName() { + return "user"; + } + + public String toString() { + return "user"; + } + }); + return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>()); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java index 75ba2b8..369e661 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/SpoutTracker.java @@ -28,13 +28,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - public class SpoutTracker extends BaseRichSpout { IRichSpout _delegate; SpoutTrackOutputCollector _tracker; String _trackId; - private class SpoutTrackOutputCollector implements ISpoutOutputCollector { public int transferred = 0; public int emitted = 0; @@ -43,11 +41,11 @@ public class SpoutTracker extends BaseRichSpout { public SpoutTrackOutputCollector(SpoutOutputCollector collector) { _collector = collector; } - + private void recordSpoutEmit() { Map stats = (Map) RegisteredGlobalState.getState(_trackId); ((AtomicInteger) stats.get("spout-emitted")).incrementAndGet(); - + } public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { @@ -63,11 +61,10 @@ public class SpoutTracker extends BaseRichSpout { @Override public void reportError(Throwable error) { - _collector.reportError(error); + _collector.reportError(error); } } - public SpoutTracker(IRichSpout delegate, String trackId) { _delegate = delegate; _trackId = trackId; @@ -95,7 +92,7 @@ public class SpoutTracker extends BaseRichSpout { public void fail(Object msgId) { _delegate.fail(msgId); Map stats = (Map) RegisteredGlobalState.getState(_trackId); - ((AtomicInteger) stats.get("processed")).incrementAndGet(); + ((AtomicInteger) stats.get("processed")).incrementAndGet(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java index e8c0a61..76b6874 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestAggregatesCounter.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static backtype.storm.utils.Utils.tuple; - public class TestAggregatesCounter extends BaseRichBolt { public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class); @@ -46,8 +45,8 @@ public class TestAggregatesCounter extends BaseRichBolt { int count = (Integer) input.getValues().get(1); _counts.put(word, count); int globalCount = 0; - for(String w: _counts.keySet()) { - globalCount+=_counts.get(w); + for (String w : _counts.keySet()) { + globalCount += _counts.get(w); } _collector.emit(tuple(globalCount)); _collector.ack(input); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java index 5790fb3..634cbe1 100755 --- a/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/testing/TestConfBolt.java @@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; - public class TestConfBolt extends BaseBasicBolt { Map<String, Object> _componentConf; Map<String, Object> _conf; @@ -34,16 +33,16 @@ public class TestConfBolt extends BaseBasicBolt { public TestConfBolt() { this(null); } - + public TestConfBolt(Map<String, Object> componentConf) { _componentConf = componentConf; - } + } @Override public void prepare(Map conf, TopologyContext context) { _conf = conf; - } - + } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("conf", "value")); @@ -58,5 +57,5 @@ public class TestConfBolt extends BaseBasicBolt { @Override public Map<String, Object> getComponentConfiguration() { return _componentConf; - } + } }
