This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 2ac9e2e STORM-3458: starter: fix all checkstyle warnings new addc1df Merge pull request #3076 from krichter722/checkstyle-starter 2ac9e2e is described below commit 2ac9e2e62a6fbb82bcff94c7015241f8135b6a4b Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Thu Jul 4 23:28:13 2019 +0200 STORM-3458: starter: fix all checkstyle warnings --- examples/storm-starter/pom.xml | 2 +- .../apache/storm/starter/BasicDRPCTopology.java | 1 + .../starter/BlobStoreAPIWordCountTopology.java | 13 ++-- .../apache/storm/starter/ExclamationTopology.java | 8 +-- .../storm/starter/FastWordCountTopology.java | 21 +++--- .../apache/storm/starter/InOrderDeliveryTest.java | 27 ++++---- .../org/apache/storm/starter/LambdaTopology.java | 13 ---- .../jvm/org/apache/storm/starter/ManualDRPC.java | 2 + .../storm/starter/MultipleLoggerTopology.java | 38 +++++------ .../storm/starter/PersistentWindowingTopology.java | 4 +- .../src/jvm/org/apache/storm/starter/Prefix.java | 28 ++++++++ .../org/apache/storm/starter/ReachTopology.java | 74 ++++++++++++---------- .../starter/ResourceAwareExampleTopology.java | 8 +-- .../org/apache/storm/starter/RollingTopWords.java | 11 ++-- .../apache/storm/starter/SingleJoinExample.java | 6 +- .../storm/starter/SkewedRollingTopWords.java | 16 ++--- .../apache/storm/starter/bolt/SingleJoinBolt.java | 60 +++++++++--------- .../storm/starter/bolt/SlidingWindowSumBolt.java | 2 +- .../storm/starter/spout/RandomSentenceSpout.java | 12 ++-- .../storm/starter/streams/AggregateExample.java | 2 +- .../storm/starter/streams/BranchExample.java | 22 +++---- .../streams/GroupByKeyAndWindowExample.java | 2 +- .../apache/storm/starter/streams/JoinExample.java | 6 +- .../storm/starter/streams/StateQueryExample.java | 8 +-- .../storm/starter/streams/StatefulWordCount.java | 8 +-- .../storm/starter/streams/TypedTupleExample.java | 9 +-- .../storm/starter/streams/WindowedWordCount.java | 2 +- .../storm/starter/streams/WordCountToBolt.java | 2 +- .../starter/tools/RankableObjectWithFields.java | 3 +- .../org/apache/storm/starter/tools/Rankings.java | 3 +- .../storm/starter/tools/SlidingWindowCounter.java | 1 + .../storm/starter/tools/SlotBasedCounter.java | 2 - .../storm/starter/trident/DebugMemoryMapState.java | 6 +- .../trident/TridentMinMaxOfDevicesTopology.java | 60 +++++++++--------- .../trident/TridentMinMaxOfVehiclesTopology.java | 39 ++++++------ .../apache/storm/starter/trident/TridentReach.java | 46 ++++++++------ 36 files changed, 300 insertions(+), 267 deletions(-) diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 97a6610..3835bbc 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -229,7 +229,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>263</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java index 6bb787b..70e03a4 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java @@ -29,6 +29,7 @@ import org.apache.storm.utils.DRPCClient; * * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> */ +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class BasicDRPCTopology { public static void main(String[] args) throws Exception { Config conf = new Config(); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java index caa751a..46df78e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java @@ -54,6 +54,7 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class BlobStoreAPIWordCountTopology { private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); private static ClientBlobStore store; // Client API to invoke blob store API functionality @@ -70,10 +71,10 @@ public class BlobStoreAPIWordCountTopology { // storm blobstore create --file blacklist.txt --acl o::rwa key private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { - String stringBlobACL = "o::rwa"; - AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL); + String stringBlobAcl = "o::rwa"; + AccessControl blobAcl = BlobStoreAclHandler.parseAccessControl(stringBlobAcl); List<AccessControl> acls = new LinkedList<AccessControl>(); - acls.add(blobACL); // more ACLs can be added here + acls.add(blobAcl); // more ACLs can be added here SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls); AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey, settableBlobMeta); blobStream.write(readFile(file).toString().getBytes()); @@ -214,17 +215,17 @@ public class BlobStoreAPIWordCountTopology { // Spout implementation public static class RandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; + SpoutOutputCollector collector; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; + this.collector = collector; } @Override public void nextTuple() { Utils.sleep(100); - _collector.emit(new Values(getRandomSentence())); + collector.emit(new Values(getRandomSentence())); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java index 73f2067..22cb2ba 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java @@ -54,17 +54,17 @@ public class ExclamationTopology extends ConfigurableTopology { } public static class ExclamationBolt extends BaseRichBolt { - OutputCollector _collector; + OutputCollector collector; @Override public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; + this.collector = collector; } @Override public void execute(Tuple tuple) { - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); + collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + collector.ack(tuple); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java index f881a86..c066d16 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java @@ -43,6 +43,7 @@ import org.apache.storm.utils.Utils; * java. This can show how fast the word count can run. */ public class FastWordCountTopology { + public static void printMetrics(Nimbus.Iface client, String name) throws Exception { ClusterSummary summary = client.getClusterInfo(); String id = null; @@ -80,8 +81,10 @@ public class FastWordCountTopology { } } double avgLatency = weightedAvgTotal / acked; - System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + - (((double) acked) / uptime + " failed: " + failed)); + System.out.println("uptime: " + uptime + + " acked: " + acked + + " avgLatency: " + avgLatency + + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed)); } public static void kill(Nimbus.Iface client, String name) throws Exception { @@ -130,19 +133,19 @@ public class FastWordCountTopology { "this is a test of the emergency broadcast system this is only a test", "peter piper picked a peck of pickeled peppers" }; - SpoutOutputCollector _collector; - Random _rand; + SpoutOutputCollector collector; + Random rand; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = ThreadLocalRandom.current(); + this.collector = collector; + rand = ThreadLocalRandom.current(); } @Override public void nextTuple() { - String sentence = CHOICES[_rand.nextInt(CHOICES.length)]; - _collector.emit(new Values(sentence), sentence); + String sentence = CHOICES[rand.nextInt(CHOICES.length)]; + collector.emit(new Values(sentence), sentence); } @Override @@ -152,7 +155,7 @@ public class FastWordCountTopology { @Override public void fail(Object id) { - _collector.emit(new Values(id), id); + collector.emit(new Values(id), id); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java index 5d314e0..e3cfd25 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java @@ -38,6 +38,7 @@ import org.apache.storm.utils.NimbusClient; import org.apache.storm.utils.Utils; public class InOrderDeliveryTest { + public static void printMetrics(Nimbus.Iface client, String name) throws Exception { ClusterSummary summary = client.getClusterInfo(); String id = null; @@ -75,8 +76,10 @@ public class InOrderDeliveryTest { } } double avgLatency = weightedAvgTotal / acked; - System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + - (((double) acked) / uptime + " failed: " + failed)); + System.out.println("uptime: " + uptime + + " acked: " + acked + + " avgLatency: " + avgLatency + + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed)); } public static void kill(Nimbus.Iface client, String name) throws Exception { @@ -116,21 +119,21 @@ public class InOrderDeliveryTest { } public static class InOrderSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - int _base = 0; - int _i = 0; + SpoutOutputCollector collector; + int base = 0; + int count = 0; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _base = context.getThisTaskIndex(); + this.collector = collector; + base = context.getThisTaskIndex(); } @Override public void nextTuple() { - Values v = new Values(_base, _i); - _collector.emit(v, "ACK"); - _i++; + Values v = new Values(base, count); + collector.emit(v, "ACK"); + count++; } @Override @@ -157,7 +160,9 @@ public class InOrderDeliveryTest { Integer c1 = tuple.getInteger(0); Integer c2 = tuple.getInteger(1); Integer exp = expected.get(c1); - if (exp == null) exp = 0; + if (exp == null) { + exp = 0; + } if (c2.intValue() != exp.intValue()) { System.out.println(c1 + " " + c2 + " != " + exp); throw new FailedException(c1 + " " + c2 + " != " + exp); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index 94b1c38..e7c134b 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -54,16 +54,3 @@ public class LambdaTopology extends ConfigurableTopology { return submit("lambda-demo", conf, builder); } } - -class Prefix implements Serializable { - private String str; - - public Prefix(String str) { - this.str = str; - } - - @Override - public String toString() { - return this.str; - } -} diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java index 2038399..e685ca1 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java @@ -25,7 +25,9 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.DRPCClient; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class ManualDRPC { + public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java index 954e195..9410931 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java @@ -49,37 +49,37 @@ public class MultipleLoggerTopology { } public static class ExclamationLoggingBolt extends BaseRichBolt { - OutputCollector _collector; - Logger _rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + OutputCollector collector; + Logger rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); // ensure the loggers are configured in the worker.xml before // trying to use them here - Logger _logger = LoggerFactory.getLogger("com.myapp"); - Logger _subLogger = LoggerFactory.getLogger("com.myapp.sub"); + Logger logger = LoggerFactory.getLogger("com.myapp"); + Logger subLogger = LoggerFactory.getLogger("com.myapp.sub"); @Override public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; + this.collector = collector; } @Override public void execute(Tuple tuple) { - _rootLogger.debug("root: This is a DEBUG message"); - _rootLogger.info("root: This is an INFO message"); - _rootLogger.warn("root: This is a WARN message"); - _rootLogger.error("root: This is an ERROR message"); + rootLogger.debug("root: This is a DEBUG message"); + rootLogger.info("root: This is an INFO message"); + rootLogger.warn("root: This is a WARN message"); + rootLogger.error("root: This is an ERROR message"); - _logger.debug("myapp: This is a DEBUG message"); - _logger.info("myapp: This is an INFO message"); - _logger.warn("myapp: This is a WARN message"); - _logger.error("myapp: This is an ERROR message"); + logger.debug("myapp: This is a DEBUG message"); + logger.info("myapp: This is an INFO message"); + logger.warn("myapp: This is a WARN message"); + logger.error("myapp: This is an ERROR message"); - _subLogger.debug("myapp.sub: This is a DEBUG message"); - _subLogger.info("myapp.sub: This is an INFO message"); - _subLogger.warn("myapp.sub: This is a WARN message"); - _subLogger.error("myapp.sub: This is an ERROR message"); + subLogger.debug("myapp.sub: This is a DEBUG message"); + subLogger.info("myapp.sub: This is an INFO message"); + subLogger.warn("myapp.sub: This is a WARN message"); + subLogger.error("myapp.sub: This is an ERROR message"); - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); + collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + collector.ack(tuple); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java index 46f29a0..62b3ed5 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java @@ -18,6 +18,8 @@ package org.apache.storm.starter; +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -38,8 +40,6 @@ import org.apache.storm.windowing.TupleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; - /** * An example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} with window persistence. * <p> diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/Prefix.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/Prefix.java new file mode 100644 index 0000000..94df975 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/Prefix.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.starter; + +import java.io.Serializable; + +class Prefix implements Serializable { + private String str; + + public Prefix(String str) { + this.str = str; + } + + @Override + public String toString() { + return this.str; + } +} diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java index 1e57a94..5905132 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java @@ -35,35 +35,39 @@ import org.apache.storm.utils.DRPCClient; /** * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation. - * <p/> - * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people + * + * <p>Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower * records. - * <p/> - * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes + * + * <p>This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes * minutes on a single machine into one that takes just a couple seconds. - * <p/> - * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. + * + * <p>For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. * * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> */ public class ReachTopology { - public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ - put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); - put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); - put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); - }}; - - public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ - put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); - put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); - put("tim", Arrays.asList("alex")); - put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); - put("adam", Arrays.asList("david", "carissa")); - put("mike", Arrays.asList("john", "bob")); - put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); - }}; + public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() { + { + put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); + put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); + put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); + } + }; + + public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() { + { + put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); + put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); + put("tim", Arrays.asList("alex")); + put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); + put("adam", Arrays.asList("david", "carissa")); + put("mike", Arrays.asList("john", "bob")); + put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); + } + }; public static LinearDRPCTopologyBuilder construct() { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); @@ -132,24 +136,24 @@ public class ReachTopology { } public static class PartialUniquer extends BaseBatchBolt<Object> { - BatchOutputCollector _collector; - Object _id; - Set<String> _followers = new HashSet<String>(); + BatchOutputCollector collector; + Object id; + Set<String> followers = new HashSet<String>(); @Override public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) { - _collector = collector; - _id = id; + this.collector = collector; + this.id = id; } @Override public void execute(Tuple tuple) { - _followers.add(tuple.getString(1)); + followers.add(tuple.getString(1)); } @Override public void finishBatch() { - _collector.emit(new Values(_id, _followers.size())); + collector.emit(new Values(id, followers.size())); } @Override @@ -159,24 +163,24 @@ public class ReachTopology { } public static class CountAggregator extends BaseBatchBolt<Object> { - BatchOutputCollector _collector; - Object _id; - int _count = 0; + BatchOutputCollector collector; + Object id; + int count = 0; @Override public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) { - _collector = collector; - _id = id; + this.collector = collector; + this.id = id; } @Override public void execute(Tuple tuple) { - _count += tuple.getInteger(1); + count += tuple.getInteger(1); } @Override public void finishBatch() { - _collector.emit(new Values(_id, _count)); + collector.emit(new Values(id, count)); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java index 18af7ea..20f951e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java @@ -106,7 +106,7 @@ public class ResourceAwareExampleTopology { private static final ConcurrentHashMap<String, String> myCrummyCache = new ConcurrentHashMap<>(); private static final int CACHE_SIZE = 100_000; - OutputCollector _collector; + OutputCollector collector; protected static String getFromCache(String key) { return myCrummyCache.get(key); @@ -127,7 +127,7 @@ public class ResourceAwareExampleTopology { @Override public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; + this.collector = collector; } @Override @@ -138,8 +138,8 @@ public class ResourceAwareExampleTopology { ret = orig + "!!!"; addToCache(orig, ret); } - _collector.emit(tuple, new Values(ret)); - _collector.ack(tuple); + collector.emit(tuple, new Values(ret)); + collector.ack(tuple); } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java index f8ad8c2..ea40e8b 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java @@ -43,15 +43,13 @@ public class RollingTopWords extends ConfigurableTopology { /** * Submits (runs) the topology. * - * Usage: "RollingTopWords [topology-name] [-local]" + * <p>Usage: "RollingTopWords [topology-name] [-local]" * - * By default, the topology is run locally under the name + * <p>By default, the topology is run locally under the name * "slidingWindowCounts". * - * Examples: - * + * <p>Examples: * ``` - * * # Runs in remote/cluster mode, with topology name "production-topology" * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology ``` * @@ -59,7 +57,6 @@ public class RollingTopWords extends ConfigurableTopology { * First positional argument (optional) is topology name, second * positional argument (optional) defines whether to run the topology * locally ("-local") or remotely, i.e. on a real cluster. - * @throws Exception */ @Override protected int run(String[] args) { @@ -71,11 +68,11 @@ public class RollingTopWords extends ConfigurableTopology { String spoutId = "wordGenerator"; String counterId = "counter"; String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); + String totalRankerId = "finalRanker"; builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); LOG.info("Topology name: " + topologyName); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java index 4f4b6b6..ad3ab3d 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java @@ -21,10 +21,10 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.NimbusClient; -/** Example of using a simple custom join bolt - * NOTE: Prefer to use the built-in JoinBolt wherever applicable +/** + * Example of using a simple custom join bolt. + * NOTE: Prefer to use the built-in JoinBolt wherever applicable */ - public class SingleJoinExample { public static void main(String[] args) throws Exception { if (!NimbusClient.isLocalOverride()) { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java index a173854..a983b7c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java @@ -46,23 +46,21 @@ public class SkewedRollingTopWords extends ConfigurableTopology { /** * Submits (runs) the topology. * - * Usage: "SkewedRollingTopWords [topology-name] [-local]" + * <p>Usage: "SkewedRollingTopWords [topology-name] [-local]" * - * By default, the topology is run locally under the name + * <p>By default, the topology is run locally under the name * "slidingWindowCounts". * - * Examples: - * - * ``` + * <p>Examples: * + * <p>``` * # Runs in remote/cluster mode, with topology name "production-topology" * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords production-topology ``` * * @param args * First positional argument (optional) is topology name, second * positional argument (optional) defines whether to run the topology - * locally ("-local") or remotely, i.e. on a real cluster. - * @throws Exception + * locally ("-local") or remotely, i.e. on a real cluster */ @Override protected int run(String[] args) { @@ -74,12 +72,12 @@ public class SkewedRollingTopWords extends ConfigurableTopology { String spoutId = "wordGenerator"; String counterId = "counter"; String aggId = "aggregator"; - String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); + String intermediateRankerId = "intermediateRanker"; builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); + String totalRankerId = "finalRanker"; builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); LOG.info("Topology name: " + topologyName); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java index 00a2ab7..5f9b225 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java @@ -28,29 +28,29 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TimeCacheMap; -/** Example of a simple custom bolt for joining two streams - * NOTE: Prefer to use the built-in JoinBolt wherever applicable +/** + * Example of a simple custom bolt for joining two streams. + * NOTE: Prefer to use the built-in JoinBolt wherever applicable */ - public class SingleJoinBolt extends BaseRichBolt { - OutputCollector _collector; - Fields _idFields; - Fields _outFields; - int _numSources; - TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending; - Map<String, GlobalStreamId> _fieldLocations; + OutputCollector collector; + Fields idFields; + Fields outFields; + int numSources; + TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> pending; + Map<String, GlobalStreamId> fieldLocations; public SingleJoinBolt(Fields outFields) { - _outFields = outFields; + this.outFields = outFields; } @Override public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _fieldLocations = new HashMap<String, GlobalStreamId>(); - _collector = collector; + fieldLocations = new HashMap<String, GlobalStreamId>(); + this.collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); - _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); - _numSources = context.getThisSources().size(); + pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); + numSources = context.getThisSources().size(); Set<String> idFields = null; for (GlobalStreamId source : context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); @@ -61,58 +61,58 @@ public class SingleJoinBolt extends BaseRichBolt { idFields.retainAll(setFields); } - for (String outfield : _outFields) { + for (String outfield : outFields) { for (String sourcefield : fields) { if (outfield.equals(sourcefield)) { - _fieldLocations.put(outfield, source); + fieldLocations.put(outfield, source); } } } } - _idFields = new Fields(new ArrayList<String>(idFields)); + this.idFields = new Fields(new ArrayList<String>(idFields)); - if (_fieldLocations.size() != _outFields.size()) { + if (fieldLocations.size() != outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); } } @Override public void execute(Tuple tuple) { - List<Object> id = tuple.select(_idFields); + List<Object> id = tuple.select(idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); - if (!_pending.containsKey(id)) { - _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); + if (!pending.containsKey(id)) { + pending.put(id, new HashMap<GlobalStreamId, Tuple>()); } - Map<GlobalStreamId, Tuple> parts = _pending.get(id); + Map<GlobalStreamId, Tuple> parts = pending.get(id); if (parts.containsKey(streamId)) { throw new RuntimeException("Received same side of single join twice"); } parts.put(streamId, tuple); - if (parts.size() == _numSources) { - _pending.remove(id); + if (parts.size() == numSources) { + pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); - for (String outField : _outFields) { - GlobalStreamId loc = _fieldLocations.get(outField); + for (String outField : outFields) { + GlobalStreamId loc = fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); } - _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); + collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for (Tuple part : parts.values()) { - _collector.ack(part); + collector.ack(part); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(_outFields); + declarer.declare(outFields); } private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { @Override public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { for (Tuple tuple : tuples.values()) { - _collector.fail(tuple); + collector.fail(tuple); } } } diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java index 7987717..2702181 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Computes sliding window sum + * Computes sliding window sum. */ public class SlidingWindowSumBolt extends BaseWindowedBolt { private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java index f6067ae..92af626 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java @@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory; public class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); - SpoutOutputCollector _collector; - Random _rand; + SpoutOutputCollector collector; + Random rand; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = new Random(); + this.collector = collector; + rand = new Random(); } @Override @@ -46,11 +46,11 @@ public class RandomSentenceSpout extends BaseRichSpout { sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature") }; - final String sentence = sentences[_rand.nextInt(sentences.length)]; + final String sentence = sentences[rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); - _collector.emit(new Values(sentence)); + collector.emit(new Values(sentence)); } protected String sentence(String input) { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java index feaff4f..34ffd7c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java @@ -23,7 +23,7 @@ import org.apache.storm.streams.windowing.TumblingWindows; import org.apache.storm.topology.base.BaseWindowedBolt; /** - * An example that illustrates the global aggregate + * An example that illustrates the global aggregate. */ public class AggregateExample { @SuppressWarnings("unchecked") diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java index d20f071..fc7e74a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java @@ -33,17 +33,17 @@ public class BranchExample { public static void main(String[] args) throws Exception { StreamBuilder builder = new StreamBuilder(); Stream<Integer>[] evenAndOdd = builder - /* - * Create a stream of random numbers from a spout that - * emits random integers by extracting the tuple value at index 0. - */ - .newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0)) - /* - * Split the stream of numbers into streams of - * even and odd numbers. The first stream contains even - * and the second contains odd numbers. - */ - .branch(x -> (x % 2) == 0, + /* + * Create a stream of random numbers from a spout that + * emits random integers by extracting the tuple value at index 0. + */ + .newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0)) + /* + * Split the stream of numbers into streams of + * even and odd numbers. The first stream contains even + * and the second contains odd numbers. + */ + .branch(x -> (x % 2) == 0, x -> (x % 2) == 1); evenAndOdd[0].forEach(x -> LOG.info("EVEN> " + x)); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java index 6bcf194..02617a2 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java @@ -34,7 +34,7 @@ import org.apache.storm.utils.Utils; /** * An example that shows the usage of {@link PairStream#groupByKeyAndWindow(Window)} - * and {@link PairStream#reduceByKeyAndWindow(Reducer, Window)} + * and {@link PairStream#reduceByKeyAndWindow(Reducer, Window)}. */ public class GroupByKeyAndWindowExample { public static void main(String[] args) throws Exception { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java index 52bbf28..9016c78 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java @@ -70,7 +70,7 @@ public class JoinExample { private static class NumberSpout extends BaseRichSpout { private final Function<Integer, Integer> function; private SpoutOutputCollector collector; - private int i = 1; + private int count = 1; NumberSpout(Function<Integer, Integer> function) { this.function = function; @@ -84,8 +84,8 @@ public class JoinExample { @Override public void nextTuple() { Utils.sleep(990); - collector.emit(new Values(i, function.apply(i))); - i++; + collector.emit(new Values(count, function.apply(count))); + count++; } @Override diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java index 4e5b1ef..b72b676 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java @@ -31,12 +31,12 @@ import org.apache.storm.utils.Utils; /** * An example that uses {@link Stream#stateQuery(StreamState)} to query the state - * <p> - * You should start a local redis instance before running the 'storm jar' command. By default + * + * <p>You should start a local redis instance before running the 'storm jar' command. By default * the connection will be attempted at localhost:6379. The default * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g. - * <p/> - * <pre> + * + * <p><pre> * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...", * "keySerializerClass":"...", "valueSerializerClass":"...", * "jedisPoolConfig":{"host":"localhost", "port":6379, diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java index afc345d..39534b3 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java @@ -26,12 +26,12 @@ import org.apache.storm.topology.base.BaseWindowedBolt; /** * A stateful word count that uses {@link PairStream#updateStateByKey(StateUpdater)} to * save the counts in a key value state. This example uses Redis state store. - * <p> - * You should start a local redis instance before running the 'storm jar' command. By default + * + * <p>You should start a local redis instance before running the 'storm jar' command. By default * the connection will be attempted at localhost:6379. The default * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g. - * <p/> - * <pre> + * + * <p><pre> * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...", * "keySerializerClass":"...", "valueSerializerClass":"...", * "jedisPoolConfig":{"host":"localhost", "port":6379, diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java index e859ca1..481758f 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java @@ -28,12 +28,13 @@ import org.apache.storm.topology.base.BaseWindowedBolt.Count; * An example that illustrates the usage of typed tuples (TupleN<..>) and {@link TupleValueMappers}. */ public class TypedTupleExample { + + /** + * The spout emits sequences of (Integer, Long, Long). TupleValueMapper can be used to extract fields + * from the values and produce a stream of typed tuple (Tuple3<Integer, Long, Long> in this case. + */ public static void main(String[] args) throws Exception { StreamBuilder builder = new StreamBuilder(); - /** - * The spout emits sequences of (Integer, Long, Long). TupleValueMapper can be used to extract fields - * from the values and produce a stream of typed tuple (Tuple3<Integer, Long, Long> in this case. - */ Stream<Tuple3<Integer, Long, Long>> stream = builder.newStream(new RandomIntegerSpout(), TupleValueMappers.of(0, 1, 2)); PairStream<Long, Integer> pairs = stream.mapToPair(t -> Pair.of(t._2 / 10000, t._1)); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java index 2c06674..ef8edd4 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java @@ -23,7 +23,7 @@ import org.apache.storm.streams.windowing.TumblingWindows; import org.apache.storm.topology.base.BaseWindowedBolt.Duration; /** - * A windowed word count example + * A windowed word count example. */ public class WindowedWordCount { public static void main(String[] args) throws Exception { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java index 082b96a..997e642 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java @@ -27,7 +27,7 @@ import org.apache.storm.tuple.ITuple; /** * An example that computes word counts and finally emits the results to an - * external bolt (sink) + * external bolt (sink). */ public class WordCountToBolt { public static void main(String[] args) throws Exception { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java index 48bc261..fea5896 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java @@ -74,6 +74,7 @@ public class RankableObjectWithFields implements Rankable, Serializable { } /** + * Get fields. * @return an immutable list of any additional data fields of the object (may be empty but will never be null) */ public List<Object> getFields() { @@ -131,8 +132,6 @@ public class RankableObjectWithFields implements Rankable, Serializable { /** * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. - * - * @return */ @Override public Rankable copy() { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java index 85a123b..89bba59 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java @@ -39,7 +39,6 @@ public class Rankings implements Serializable { /** * Copy constructor. - * @param other */ public Rankings(Rankings other) { this(other.maxSize()); @@ -47,6 +46,7 @@ public class Rankings implements Serializable { } /** + * Get max size. * @return the maximum possible number (size) of ranked objects this instance can hold */ public int maxSize() { @@ -54,6 +54,7 @@ public class Rankings implements Serializable { } /** + * Get size. * @return the number (size) of ranked objects this instance is currently holding */ public int size() { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java index 6aedfc8..4ab8488 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java @@ -95,6 +95,7 @@ public final class SlidingWindowCounter<T> implements Serializable { * @return The current (total) counts of all tracked objects. */ public Map<T, Long> getCountsThenAdvanceWindow() { + @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java index 51157b0..6f48c47 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java @@ -75,8 +75,6 @@ public final class SlotBasedCounter<T> implements Serializable { /** * Reset the slot count of any tracked objects to zero for the given slot. - * - * @param slot */ public void wipeSlot(int slot) { for (T obj : objToCounts.keySet()) { diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java index ceac936..3a7aeb0 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java @@ -59,15 +59,15 @@ public class DebugMemoryMapState<T> extends MemoryMapState<T> { } public static class Factory implements StateFactory { - String _id; + String id; public Factory() { - _id = UUID.randomUUID().toString(); + id = UUID.randomUUID().toString(); } @Override public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new DebugMemoryMapState(_id + partitionIndex); + return new DebugMemoryMapState(id + partitionIndex); } } } diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java index bde5baa..176326e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java @@ -41,21 +41,24 @@ public class TridentMinMaxOfDevicesTopology { * generates result stream based on min amd max with device-id and count values. */ public static StormTopology buildDevicesTopology() { - String deviceID = "device-id"; + String deviceId = "device-id"; String count = "count"; - Fields allFields = new Fields(deviceID, count); + Fields allFields = new Fields(deviceId, count); RandomNumberGeneratorSpout spout = new RandomNumberGeneratorSpout(allFields, 10, 1000); TridentTopology topology = new TridentTopology(); - Stream devicesStream = topology.newStream("devicegen-spout", spout). - each(allFields, new Debug("##### devices")); + Stream devicesStream = topology + .newStream("devicegen-spout", spout) + .each(allFields, new Debug("##### devices")); - devicesStream.minBy(deviceID). - each(allFields, new Debug("#### device with min id")); + devicesStream + .minBy(deviceId) + .each(allFields, new Debug("#### device with min id")); - devicesStream.maxBy(count). - each(allFields, new Debug("#### device with max count")); + devicesStream + .maxBy(count) + .each(allFields, new Debug("#### device with max count")); return topology.build(); } @@ -73,28 +76,27 @@ public class TridentMinMaxOfDevicesTopology { spout.setCycle(true); TridentTopology topology = new TridentTopology(); - Stream vehiclesStream = topology.newStream("spout1", spout). - each(allFields, new Debug("##### vehicles")); + Stream vehiclesStream = topology + .newStream("spout1", spout) + .each(allFields, new Debug("##### vehicles")); - Stream slowVehiclesStream = - vehiclesStream + Stream slowVehiclesStream = vehiclesStream .min(new SpeedComparator()) .each(vehicleField, new Debug("#### slowest vehicle")); - Stream slowDriversStream = - slowVehiclesStream + Stream slowDriversStream = slowVehiclesStream .project(driverField) .each(driverField, new Debug("##### slowest driver")); vehiclesStream - .max(new SpeedComparator()) - .each(vehicleField, new Debug("#### fastest vehicle")) - .project(driverField) - .each(driverField, new Debug("##### fastest driver")); + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); vehiclesStream - .max(new EfficiencyComparator()). - each(vehicleField, new Debug("#### efficient vehicle")); + .max(new EfficiencyComparator()) + .each(vehicleField, new Debug("#### efficient vehicle")); return topology.build(); } @@ -141,10 +143,10 @@ public class TridentMinMaxOfDevicesTopology { @Override public String toString() { - return "Driver{" + - "name='" + name + '\'' + - ", id=" + id + - '}'; + return "Driver{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; } } @@ -176,11 +178,11 @@ public class TridentMinMaxOfDevicesTopology { @Override public String toString() { - return "Vehicle{" + - "name='" + name + '\'' + - ", maxSpeed=" + maxSpeed + - ", efficiency=" + efficiency + - '}'; + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; } } } diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java index f37442b..83e9d62 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java @@ -50,8 +50,9 @@ public class TridentMinMaxOfVehiclesTopology { spout.setCycle(true); TridentTopology topology = new TridentTopology(); - Stream vehiclesStream = topology.newStream("spout1", spout). - each(allFields, new Debug("##### vehicles")); + Stream vehiclesStream = topology + .newStream("spout1", spout) + .each(allFields, new Debug("##### vehicles")); Stream slowVehiclesStream = vehiclesStream @@ -64,18 +65,18 @@ public class TridentMinMaxOfVehiclesTopology { .each(driverField, new Debug("##### slowest driver")); vehiclesStream - .max(new SpeedComparator()) - .each(vehicleField, new Debug("#### fastest vehicle")) - .project(driverField) - .each(driverField, new Debug("##### fastest driver")); + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); vehiclesStream - .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). - each(vehicleField, new Debug("#### least efficient vehicle")); + .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) + .each(vehicleField, new Debug("#### least efficient vehicle")); vehiclesStream - .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). - each(vehicleField, new Debug("#### most efficient vehicle")); + .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) + .each(vehicleField, new Debug("#### most efficient vehicle")); return topology.build(); } @@ -120,10 +121,10 @@ public class TridentMinMaxOfVehiclesTopology { @Override public String toString() { - return "Driver{" + - "name='" + name + '\'' + - ", id=" + id + - '}'; + return "Driver{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; } } @@ -155,11 +156,11 @@ public class TridentMinMaxOfVehiclesTopology { @Override public String toString() { - return "Vehicle{" + - "name='" + name + '\'' + - ", maxSpeed=" + maxSpeed + - ", efficiency=" + efficiency + - '}'; + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; } } } diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java index 19c0665..a159a3e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java @@ -38,21 +38,25 @@ import org.apache.storm.tuple.Values; import org.apache.storm.utils.DRPCClient; public class TridentReach { - public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ - put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); - put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); - put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); - }}; - - public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ - put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); - put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); - put("tim", Arrays.asList("alex")); - put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); - put("adam", Arrays.asList("david", "carissa")); - put("mike", Arrays.asList("john", "bob")); - put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); - }}; + public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() { + { + put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); + put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); + put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); + } + }; + + public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() { + { + put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); + put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); + put("tim", Arrays.asList("alex")); + put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); + put("adam", Arrays.asList("david", "carissa")); + put("mike", Arrays.asList("john", "bob")); + put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); + } + }; public static StormTopology buildTopology() { TridentTopology topology = new TridentTopology(); @@ -83,10 +87,10 @@ public class TridentReach { } public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { - Map _map; + Map map; public StaticSingleKeyMapState(Map map) { - _map = map; + this.map = map; } @Override @@ -94,21 +98,21 @@ public class TridentReach { List<Object> ret = new ArrayList(); for (List<Object> key : keys) { Object singleKey = key.get(0); - ret.add(_map.get(singleKey)); + ret.add(map.get(singleKey)); } return ret; } public static class Factory implements StateFactory { - Map _map; + Map map; public Factory(Map map) { - _map = map; + this.map = map; } @Override public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new StaticSingleKeyMapState(_map); + return new StaticSingleKeyMapState(map); } }