This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new b203ae3 STORM-3429: closure: fix all checkstyle warnings new e80da42 Merge pull request #3045 from krichter722/checkstyle-closure b203ae3 is described below commit b203ae379b61117b0f795c3c408b81751cf8d3bc Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Thu Jun 27 23:06:03 2019 +0200 STORM-3429: closure: fix all checkstyle warnings --- storm-clojure/pom.xml | 2 +- .../java/org/apache/storm/clojure/ClojureBolt.java | 77 +++++++------- .../clojure/ClojureSerializationRegister.java | 5 +- .../org/apache/storm/clojure/ClojureSpout.java | 101 ++++++++++--------- .../org/apache/storm/clojure/ClojureTuple.java | 112 ++++++++++++--------- .../java/org/apache/storm/clojure/ClojureUtil.java | 1 + .../apache/storm/clojure/IndifferentAccessMap.java | 53 ++++++---- .../org/apache/storm/clojure/RichShellBolt.java | 14 +-- .../org/apache/storm/clojure/RichShellSpout.java | 14 +-- 9 files changed, 207 insertions(+), 172 deletions(-) diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml index f886767..89d200b 100644 --- a/storm-clojure/pom.xml +++ b/storm-clojure/pom.xml @@ -110,7 +110,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>173</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java index af371a2..bc37400 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java @@ -15,8 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; +import clojure.lang.IFn; +import clojure.lang.Keyword; +import clojure.lang.PersistentArrayMap; +import clojure.lang.RT; +import clojure.lang.Symbol; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,47 +38,43 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -import clojure.lang.IFn; -import clojure.lang.Keyword; -import clojure.lang.PersistentArrayMap; -import clojure.lang.RT; -import clojure.lang.Symbol; - public class ClojureBolt implements IRichBolt, FinishedCallback { - Map<String, StreamInfo> _fields; - List<String> _fnSpec; - List<String> _confSpec; - List<Object> _params; + Map<String, StreamInfo> fields; + List<String> fnSpec; + List<String> confSpec; + List<Object> params; - IBolt _bolt; + IBolt bolt; public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { - _fnSpec = fnSpec; - _confSpec = confSpec; - _params = params; - _fields = fields; + this.fnSpec = fnSpec; + this.confSpec = confSpec; + this.params = params; + this.fields = fields; } @Override public void prepare(final Map<String, Object> topoConf, final TopologyContext context, final OutputCollector collector) { - IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); + IFn hof = ClojureUtil.loadClojureFn(fnSpec.get(0), fnSpec.get(1)); try { - IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { + IFn preparer = (IFn) hof.applyTo(RT.seq(params)); + final Map<Keyword,Object> collectorMap = new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector, Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(topoConf); - add(context); - add(collectorMap); - }}; + List<Object> args = new ArrayList<Object>() { + { + add(topoConf); + add(context); + add(collectorMap); + } + }; - _bolt = (IBolt) preparer.applyTo(RT.seq(args)); + bolt = (IBolt) preparer.applyTo(RT.seq(args)); //this is kind of unnecessary for clojure try { - _bolt.prepare(topoConf, context, collector); - } catch(AbstractMethodError ame) { - + bolt.prepare(topoConf, context, collector); + } catch (AbstractMethodError ame) { + //ignore } } catch (Exception e) { throw new RuntimeException(e); @@ -80,38 +83,38 @@ public class ClojureBolt implements IRichBolt, FinishedCallback { @Override public void execute(Tuple input) { - _bolt.execute(new ClojureTuple(input)); + bolt.execute(new ClojureTuple(input)); } @Override public void cleanup() { try { - _bolt.cleanup(); - } catch(AbstractMethodError ame) { - + bolt.cleanup(); + } catch (AbstractMethodError ame) { + //ignore } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { - StreamInfo info = _fields.get(stream); + for (String stream: fields.keySet()) { + StreamInfo info = fields.get(stream); declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); } } @Override public void finishedId(Object id) { - if(_bolt instanceof FinishedCallback) { - ((FinishedCallback) _bolt).finishedId(id); + if (bolt instanceof FinishedCallback) { + ((FinishedCallback) bolt).finishedId(id); } } @Override public Map<String, Object> getComponentConfiguration() { - IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1)); + IFn hof = ClojureUtil.loadClojureFn(confSpec.get(0), confSpec.get(1)); try { - return (Map) hof.applyTo(RT.seq(_params)); + return (Map) hof.applyTo(RT.seq(params)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java index df0840b..fb37b03 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; -import org.apache.storm.serialization.SerializationRegister; +import carbonite.JavaBridge; import com.esotericsoftware.kryo.Kryo; -import carbonite.JavaBridge; +import org.apache.storm.serialization.SerializationRegister; public class ClojureSerializationRegister implements SerializationRegister { diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java index 17399db..f68415f 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java @@ -15,8 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; +import clojure.lang.IFn; +import clojure.lang.Keyword; +import clojure.lang.PersistentArrayMap; +import clojure.lang.RT; +import clojure.lang.Symbol; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,48 +36,44 @@ import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; -import clojure.lang.IFn; -import clojure.lang.Keyword; -import clojure.lang.PersistentArrayMap; -import clojure.lang.RT; -import clojure.lang.Symbol; - public class ClojureSpout implements IRichSpout { - Map<String, StreamInfo> _fields; - List<String> _fnSpec; - List<String> _confSpec; - List<Object> _params; + Map<String, StreamInfo> fields; + List<String> fnSpec; + List<String> confSpec; + List<Object> params; - ISpout _spout; + ISpout spout; public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { - _fnSpec = fnSpec; - _confSpec = confSpec; - _params = params; - _fields = fields; + this.fnSpec = fnSpec; + this.confSpec = confSpec; + this.params = params; + this.fields = fields; } @Override public void open(final Map<String, Object> conf, final TopologyContext context, final SpoutOutputCollector collector) { - IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); + IFn hof = ClojureUtil.loadClojureFn(fnSpec.get(0), fnSpec.get(1)); try { - IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { + IFn preparer = (IFn) hof.applyTo(RT.seq(params)); + final Map<Keyword,Object> collectorMap = new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector, Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(conf); - add(context); - add(collectorMap); - }}; + List<Object> args = new ArrayList<Object>() { + { + add(conf); + add(context); + add(collectorMap); + } + }; - _spout = (ISpout) preparer.applyTo(RT.seq(args)); + spout = (ISpout) preparer.applyTo(RT.seq(args)); //this is kind of unnecessary for clojure try { - _spout.open(conf, context, collector); - } catch(AbstractMethodError ame) { - + spout.open(conf, context, collector); + } catch (AbstractMethodError ame) { + //ignore } } catch (Exception e) { throw new RuntimeException(e); @@ -80,18 +83,18 @@ public class ClojureSpout implements IRichSpout { @Override public void close() { try { - _spout.close(); - } catch(AbstractMethodError ame) { - + spout.close(); + } catch (AbstractMethodError ame) { + //ignore } } @Override public void nextTuple() { try { - _spout.nextTuple(); - } catch(AbstractMethodError ame) { - + spout.nextTuple(); + } catch (AbstractMethodError ame) { + //ignore } } @@ -99,9 +102,9 @@ public class ClojureSpout implements IRichSpout { @Override public void ack(Object msgId) { try { - _spout.ack(msgId); - } catch(AbstractMethodError ame) { - + spout.ack(msgId); + } catch (AbstractMethodError ame) { + //ignore } } @@ -109,26 +112,26 @@ public class ClojureSpout implements IRichSpout { @Override public void fail(Object msgId) { try { - _spout.fail(msgId); - } catch(AbstractMethodError ame) { - + spout.fail(msgId); + } catch (AbstractMethodError ame) { + //ignore } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { - StreamInfo info = _fields.get(stream); + for (String stream: fields.keySet()) { + StreamInfo info = fields.get(stream); declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); } } @Override public Map<String, Object> getComponentConfiguration() { - IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1)); + IFn hof = ClojureUtil.loadClojureFn(confSpec.get(0), confSpec.get(1)); try { - return (Map) hof.applyTo(RT.seq(_params)); + return (Map) hof.applyTo(RT.seq(params)); } catch (Exception e) { throw new RuntimeException(e); } @@ -137,18 +140,18 @@ public class ClojureSpout implements IRichSpout { @Override public void activate() { try { - _spout.activate(); - } catch(AbstractMethodError ame) { - + spout.activate(); + } catch (AbstractMethodError ame) { + //ignore } } @Override public void deactivate() { try { - _spout.deactivate(); - } catch(AbstractMethodError ame) { - + spout.deactivate(); + } catch (AbstractMethodError ame) { + //ignore } } } diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java index 7b41b67..3b4789c 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java @@ -15,16 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.clojure; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; +package org.apache.storm.clojure; import clojure.lang.AFn; import clojure.lang.ASeq; @@ -45,9 +37,18 @@ import clojure.lang.PersistentArrayMap; import clojure.lang.Seqable; import clojure.lang.Symbol; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; + public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, ILookup, IPersistentMap, Map, IFn { - private IPersistentMap _meta; - private IPersistentMap _map; + private IPersistentMap meta; + private IPersistentMap map; public ClojureTuple(Tuple t) { super(t); @@ -58,32 +59,33 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, } private PersistentArrayMap toMap() { - Object array[] = new Object[size()*2]; + Object[] array = new Object[size() * 2]; List<String> fields = getFields().toList(); - for(int i=0; i < size(); i++) { - array[i*2] = fields.get(i); - array[(i*2)+1] = getValue(i); + for (int i = 0; i < size(); i++) { + array[i * 2] = fields.get(i); + array[(i * 2) + 1] = getValue(i); } return new PersistentArrayMap(array); } public IPersistentMap getMap() { - if (_map == null) { - _map = toMap(); + if (map == null) { + map = toMap(); } - return _map; + return map; } /* ILookup */ @Override public Object valAt(Object o) { try { - if(o instanceof Keyword) { + if (o instanceof Keyword) { return getValueByField(((Keyword) o).getName()); - } else if(o instanceof String) { + } else if (o instanceof String) { return getValueByField((String) o); } - } catch(IllegalArgumentException ignored) { + } catch (IllegalArgumentException ignored) { + //ignore } return null; } @@ -91,14 +93,16 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, @Override public Object valAt(Object o, Object def) { Object ret = valAt(o); - if (ret==null) ret = def; + if (ret == null) { + ret = def; + } return ret; } /* Seqable */ @Override public ISeq seq() { - if(size() > 0) { + if (size() > 0) { return new Seq(getFields().toList(), getValues(), 0); } return null; @@ -108,46 +112,46 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, private static final long serialVersionUID = 1L; final List<String> fields; final List<Object> values; - final int i; + final int count; - Seq(List<String> fields, List<Object> values, int i) { + Seq(List<String> fields, List<Object> values, int count) { this.fields = fields; this.values = values; - assert i >= 0; - this.i = i; + assert count >= 0; + this.count = count; } - public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) { + public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int count) { super(meta); - this.fields= fields; + this.fields = fields; this.values = values; - assert i >= 0; - this.i = i; + assert count >= 0; + this.count = count; } @Override public Object first() { - return new MapEntry(fields.get(i), values.get(i)); + return new MapEntry(fields.get(count), values.get(count)); } @Override public ISeq next() { - if(i+1 < fields.size()) { - return new Seq(fields, values, i+1); + if (count + 1 < fields.size()) { + return new Seq(fields, values, count + 1); } return null; } @Override public int count() { - assert fields.size() -i >= 0 : "index out of bounds"; + assert fields.size() - count >= 0 : "index out of bounds"; // i being the position in the fields of this seq, the remainder of the seq is the size - return fields.size() -i; + return fields.size() - count; } @Override public Obj withMeta(IPersistentMap meta) { - return new Seq(meta, fields, values, i); + return new Seq(meta, fields, values, count); } } @@ -164,7 +168,9 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, @Override public Object nth(int i, Object notfound) { Object ret = nth(i); - if (ret==null) ret = notfound; + if (ret == null) { + ret = notfound; + } return ret; } @@ -177,13 +183,13 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, /* IMeta */ @Override public IPersistentMap meta() { - if(_meta==null) { - _meta = new PersistentArrayMap( new Object[] { + if (meta == null) { + meta = new PersistentArrayMap(new Object[] { makeKeyword("stream"), getSourceStreamId(), makeKeyword("component"), getSourceComponent(), makeKeyword("task"), getSourceTask()}); } - return _meta; + return meta; } /* IFn */ @@ -337,35 +343,41 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, /* Naive implementation, but it might be good enough */ @Override public IPersistentMap assoc(Object k, Object v) { - if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v); - + if (k instanceof Keyword) { + return assoc(((Keyword) k).getName(), v); + } return new IndifferentAccessMap(getMap().assoc(k, v)); } @Override public IPersistentMap assocEx(Object k, Object v) { - if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v); - + if (k instanceof Keyword) { + return assocEx(((Keyword) k).getName(), v); + } return new IndifferentAccessMap(getMap().assocEx(k, v)); } @Override public IPersistentMap without(Object k) { - if(k instanceof Keyword) return without(((Keyword) k).getName()); - + if (k instanceof Keyword) { + return without(((Keyword) k).getName()); + } return new IndifferentAccessMap(getMap().without(k)); } @Override public boolean containsKey(Object k) { - if(k instanceof Keyword) return containsKey(((Keyword) k).getName()); + if (k instanceof Keyword) { + return containsKey(((Keyword) k).getName()); + } return getMap().containsKey(k); } @Override public IMapEntry entryAt(Object k) { - if(k instanceof Keyword) return entryAt(((Keyword) k).getName()); - + if (k instanceof Keyword) { + return entryAt(((Keyword) k).getName()); + } return getMap().entryAt(k); } diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java index 0bfe9fb..d3c3ed2 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; import clojure.lang.RT; diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java index 55ba58b..f131fcf 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java @@ -15,37 +15,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.clojure; +package org.apache.storm.clojure; import clojure.lang.ILookup; -import clojure.lang.ISeq; -import clojure.lang.IPersistentMap; -import clojure.lang.PersistentArrayMap; import clojure.lang.IMapEntry; import clojure.lang.IPersistentCollection; +import clojure.lang.IPersistentMap; +import clojure.lang.ISeq; import clojure.lang.Keyword; +import clojure.lang.PersistentArrayMap; + +import java.util.Collection; import java.util.Iterator; import java.util.Map; -import java.util.Collection; import java.util.Set; public class IndifferentAccessMap implements ILookup, IPersistentMap, Map { - protected IPersistentMap _map; - + protected IPersistentMap map; public IndifferentAccessMap(IPersistentMap map) { setMap(map); } public IPersistentMap getMap() { - return _map; + return map; } public IPersistentMap setMap(IPersistentMap map) { - _map = map; - return _map; + this.map = map; + return this.map; } @Override @@ -65,7 +65,7 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map { @Override public Object valAt(Object o) { - if(o instanceof Keyword) { + if (o instanceof Keyword) { return valAt(((Keyword) o).getName()); } return getMap().valAt(o); @@ -74,7 +74,9 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map { @Override public Object valAt(Object o, Object def) { Object ret = valAt(o); - if(ret==null) ret = def; + if (ret == null) { + ret = def; + } return ret; } @@ -82,35 +84,41 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map { /* Naive implementation, but it might be good enough */ @Override public IPersistentMap assoc(Object k, Object v) { - if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v); - + if (k instanceof Keyword) { + return assoc(((Keyword) k).getName(), v); + } return new IndifferentAccessMap(getMap().assoc(k, v)); } @Override public IPersistentMap assocEx(Object k, Object v) { - if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v); - + if (k instanceof Keyword) { + return assocEx(((Keyword) k).getName(), v); + } return new IndifferentAccessMap(getMap().assocEx(k, v)); } @Override public IPersistentMap without(Object k) { - if(k instanceof Keyword) return without(((Keyword) k).getName()); - + if (k instanceof Keyword) { + return without(((Keyword) k).getName()); + } return new IndifferentAccessMap(getMap().without(k)); } @Override public boolean containsKey(Object k) { - if(k instanceof Keyword) return containsKey(((Keyword) k).getName()); + if (k instanceof Keyword) { + return containsKey(((Keyword) k).getName()); + } return getMap().containsKey(k); } @Override public IMapEntry entryAt(Object k) { - if(k instanceof Keyword) return entryAt(((Keyword) k).getName()); - + if (k instanceof Keyword) { + return entryAt(((Keyword) k).getName()); + } return getMap().entryAt(k); } @@ -170,14 +178,17 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map { public void clear() { throw new UnsupportedOperationException(); } + @Override public Object put(Object k, Object v) { throw new UnsupportedOperationException(); } + @Override public void putAll(Map m) { throw new UnsupportedOperationException(); } + @Override public Object remove(Object k) { throw new UnsupportedOperationException(); diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java index 6de5637..86831b1 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java @@ -15,28 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; +import java.util.Map; + import org.apache.storm.generated.StreamInfo; import org.apache.storm.task.ShellBolt; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; -import java.util.Map; public class RichShellBolt extends ShellBolt implements IRichBolt { - private Map<String, StreamInfo> _outputs; + private Map<String, StreamInfo> outputs; public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) { super(command); - _outputs = outputs; + this.outputs = outputs; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { - StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { + for (String stream: outputs.keySet()) { + StreamInfo def = outputs.get(stream); + if (def.is_direct()) { declarer.declareStream(stream, true, new Fields(def.get_output_fields())); } else { declarer.declareStream(stream, new Fields(def.get_output_fields())); diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java index 9fb7e73..84e0567 100644 --- a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java +++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java @@ -15,28 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.clojure; +import java.util.Map; + import org.apache.storm.generated.StreamInfo; import org.apache.storm.spout.ShellSpout; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; -import java.util.Map; public class RichShellSpout extends ShellSpout implements IRichSpout { - private Map<String, StreamInfo> _outputs; + private Map<String, StreamInfo> outputs; public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) { super(command); - _outputs = outputs; + this.outputs = outputs; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { - StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { + for (String stream: outputs.keySet()) { + StreamInfo def = outputs.get(stream); + if (def.is_direct()) { declarer.declareStream(stream, true, new Fields(def.get_output_fields())); } else { declarer.declareStream(stream, new Fields(def.get_output_fields()));