This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 7bf7268 STORM-3450: perf: fix all checkstyle warnings new 67ae024 Merge pull request #3066 from krichter722/checkstyle-perf 7bf7268 is described below commit 7bf7268e25a627882ddccc68fdf58df9889011a5 Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Mon Jul 1 22:18:27 2019 +0200 STORM-3450: perf: fix all checkstyle warnings --- examples/storm-perf/pom.xml | 2 +- .../storm/perf/ConstSpoutIdBoltNullBoltTopo.java | 2 +- .../apache/storm/perf/ConstSpoutNullBoltTopo.java | 13 +- .../org/apache/storm/perf/ConstSpoutOnlyTopo.java | 11 +- .../apache/storm/perf/FileReadWordCountTopo.java | 6 +- .../apache/storm/perf/HdfsSpoutNullBoltTopo.java | 10 +- .../org/apache/storm/perf/KafkaClientHdfsTopo.java | 16 +- .../org/apache/storm/perf/LowThroughputTopo.java | 1 - .../apache/storm/perf/StrGenSpoutHdfsBoltTopo.java | 20 +- .../org/apache/storm/perf/ThroughputMeter.java | 1 + .../org/apache/storm/perf/bolt/DevNullBolt.java | 4 +- .../org/apache/storm/perf/queuetest/Acker.java | 62 ++++++ .../storm/perf/queuetest/AckingProducer.java | 62 ++++++ .../org/apache/storm/perf/queuetest/Consumer.java | 63 ++++++ .../org/apache/storm/perf/queuetest/Forwarder.java | 70 ++++++ .../perf/{ => queuetest}/JCQueuePerfTest.java | 239 +-------------------- .../org/apache/storm/perf/queuetest/MyThread.java | 36 ++++ .../org/apache/storm/perf/queuetest/Producer.java | 44 ++++ .../org/apache/storm/perf/queuetest/Producer2.java | 50 +++++ .../org/apache/storm/perf/spout/WordGenSpout.java | 3 +- .../java/org/apache/storm/perf/toolstest/Cons.java | 61 ++++++ .../perf/{ => toolstest}/JCToolsPerfTest.java | 108 +--------- .../org/apache/storm/perf/toolstest/MyThd.java | 37 ++++ .../java/org/apache/storm/perf/toolstest/Prod.java | 46 ++++ .../org/apache/storm/perf/toolstest/Prod2.java | 46 ++++ .../java/org/apache/storm/perf/utils/Helper.java | 2 +- .../org/apache/storm/perf/utils/MetricsSample.java | 4 +- 27 files changed, 627 insertions(+), 392 deletions(-) diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml index 527ef49..07c4b17 100644 --- a/examples/storm-perf/pom.xml +++ b/examples/storm-perf/pom.xml @@ -92,7 +92,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>65</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java index a65efe6..c83763d 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java @@ -30,7 +30,7 @@ import org.apache.storm.utils.Utils; /** * ConstSpout -> IdBolt -> DevNullBolt This topology measures speed of messaging between spouts->bolt and bolt->bolt ConstSpout : - * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples + * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples. */ public class ConstSpoutIdBoltNullBoltTopo { diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java index 9683e08..63ef51b 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java @@ -28,12 +28,13 @@ import org.apache.storm.topology.BoltDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; -/*** - * This topo helps measure the messaging peak throughput between a spout and a bolt. - * Spout generates a stream of a fixed string. - * Bolt will simply ack and discard the tuple received +/** + * This topo helps measure the messaging peak throughput between a spout and a bolt. + * + * <p>Spout generates a stream of a fixed string. + * + * <p>Bolt will simply ack and discard the tuple received. */ - public class ConstSpoutNullBoltTopo { public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo"; @@ -79,7 +80,7 @@ public class ConstSpoutNullBoltTopo { } /** - * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle) + * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle). */ public static void main(String[] args) throws Exception { int runTime = -1; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java index 5848cbc..1b012fe 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java @@ -25,12 +25,11 @@ import org.apache.storm.perf.utils.Helper; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; - -/*** - * This topo helps measure how fast a spout can produce data (so no bolts are attached) - * Spout generates a stream of a fixed string. +/** + * This topo helps measure how fast a spout can produce data (so no bolts are attached). + * + * <p>Spout generates a stream of a fixed string. */ - public class ConstSpoutOnlyTopo { public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo"; @@ -49,7 +48,7 @@ public class ConstSpoutOnlyTopo { } /** - * ConstSpout only topology (No bolts) + * ConstSpout only topology (No bolts). */ public static void main(String[] args) throws Exception { int runTime = -1; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java index 827337e..78eaab7 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java @@ -29,11 +29,11 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; -/*** +/** * This topo helps measure speed of word count. - * Spout loads a file into memory on initialization, then emits the lines in an endless loop. + * + * <p>Spout loads a file into memory on initialization, then emits the lines in an endless loop. */ - public class FileReadWordCountTopo { public static final String SPOUT_ID = "spout"; public static final String COUNT_ID = "counter"; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java index 4f97dfa..9876cac 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java @@ -28,13 +28,13 @@ import org.apache.storm.perf.utils.Helper; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; -/*** +/** * This topo helps measure speed of reading from Hdfs. - * Spout Reads from Hdfs. - * Bolt acks and discards tuples + * + * <p>Spout Reads from Hdfs. + * + * <p>Bolt acks and discards tuples. */ - - public class HdfsSpoutNullBoltTopo { public static final int DEFAULT_SPOUT_NUM = 1; public static final int DEFAULT_BOLT_NUM = 1; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java index e269873..5eb61b2 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java @@ -38,12 +38,13 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Utils; -/*** +/** * This topo helps measure speed of reading from Kafka and writing to Hdfs. - * Spout Reads from Kafka. - * Bolt writes to Hdfs + * + * <p>Spout Reads from Kafka. + * + * <p>Bolt writes to Hdfs. */ - public class KafkaClientHdfsTopo { // configs - topo parallelism @@ -123,7 +124,7 @@ public class KafkaClientHdfsTopo { /** - * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming + * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming. */ public static void main(String[] args) throws Exception { @@ -132,7 +133,6 @@ public class KafkaClientHdfsTopo { return; } - Integer durationSec = Integer.parseInt(args[0]); String confFile = args[1]; Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile); topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000); @@ -143,6 +143,7 @@ public class KafkaClientHdfsTopo { topoConf.putAll(Utils.readCommandLineOpts()); // Submit topology to Storm cluster + Integer durationSec = Integer.parseInt(args[0]); Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); } @@ -156,9 +157,6 @@ public class KafkaClientHdfsTopo { /** * Overrides the default record delimiter. - * - * @param delimiter - * @return */ public LineWriter withLineDelimiter(String delimiter) { this.lineDelimiter = delimiter; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java index e4972a4..38e35bc 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java @@ -18,7 +18,6 @@ package org.apache.storm.perf; - import java.util.Collections; import java.util.List; import java.util.Map; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java index fa0d90b..0c53db9 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java @@ -36,12 +36,13 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; -/*** - * This topo helps measure speed of writing to Hdfs - * Spout generates fixed length random strings. - * Bolt writes to Hdfs +/** + * This topo helps measure speed of writing to Hdfs. + * + * <p>Spout generates fixed length random strings. + * + * <p>Bolt writes to Hdfs. */ - public class StrGenSpoutHdfsBoltTopo { // configs - topo parallelism @@ -71,7 +72,7 @@ public class StrGenSpoutHdfsBoltTopo { // 2 - Setup HFS Bolt -------- - String Hdfs_url = Helper.getStr(topoConf, HDFS_URI); + String hdfsUrl = Helper.getStr(topoConf, HDFS_URI); RecordFormat format = new LineWriter("str"); SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB); @@ -83,7 +84,7 @@ public class StrGenSpoutHdfsBoltTopo { // Instantiate the HdfsBolt HdfsBolt bolt = new HdfsBolt() - .withFsUrl(Hdfs_url) + .withFsUrl(hdfsUrl) .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) @@ -102,7 +103,7 @@ public class StrGenSpoutHdfsBoltTopo { /** - * Spout generates random strings and HDFS bolt writes them to a text file + * Spout generates random strings and HDFS bolt writes them to a text file. */ public static void main(String[] args) throws Exception { String confFile = "conf/HdfsSpoutTopo.yaml"; @@ -144,9 +145,6 @@ public class StrGenSpoutHdfsBoltTopo { /** * Overrides the default record delimiter. - * - * @param delimiter - * @return */ public LineWriter withLineDelimiter(String delimiter) { this.lineDelimiter = delimiter; diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java index bd5d1e1..3d8e736 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java @@ -31,6 +31,7 @@ public class ThroughputMeter { } /** + * Calculate throughput. * @return events/sec */ private static double calcThroughput(long count, long startTime, long endTime) { diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java index abb397d..cc181c0 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java @@ -33,7 +33,7 @@ public class DevNullBolt extends BaseRichBolt { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DevNullBolt.class); private OutputCollector collector; private Long sleepNanos; - private int eCount = 0; + private int count = 0; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { @@ -47,7 +47,7 @@ public class DevNullBolt extends BaseRichBolt { if (sleepNanos > 0) { LockSupport.parkNanos(sleepNanos); } - ++eCount; + ++count; } @Override diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java new file mode 100644 index 0000000..e936329 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import org.apache.storm.utils.JCQueue; + +/** + * Reads from ackerInQ and writes to spout queue. + */ +class Acker extends MyThread { + private final JCQueue ackerInQ; + private final JCQueue spoutInQ; + + public Acker(JCQueue ackerInQ, JCQueue spoutInQ) { + super("Acker"); + this.ackerInQ = ackerInQ; + this.spoutInQ = spoutInQ; + } + + + @Override + public void run() { + long start = System.currentTimeMillis(); + Handler handler = new Handler(); + while (!Thread.interrupted()) { + ackerInQ.consume(handler); + } + runTime = System.currentTimeMillis() - start; + } + + private class Handler implements JCQueue.Consumer { + @Override + public void accept(Object event) { + try { + spoutInQ.publish(event); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void flush() throws InterruptedException { + spoutInQ.flush(); + } + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java new file mode 100644 index 0000000..132d793 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import org.apache.storm.utils.JCQueue; + +/** + * Writes to two queues. + */ +class AckingProducer extends MyThread { + private final JCQueue ackerInQ; + private final JCQueue spoutInQ; + + public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) { + super("AckingProducer"); + this.ackerInQ = ackerInQ; + this.spoutInQ = spoutInQ; + } + + @Override + public void run() { + try { + Handler handler = new Handler(); + long start = System.currentTimeMillis(); + while (!Thread.interrupted()) { + int x = spoutInQ.consume(handler); + ackerInQ.publish(count); + } + runTime = System.currentTimeMillis() - start; + } catch (InterruptedException e) { + return; + } + } + + private class Handler implements JCQueue.Consumer { + @Override + public void accept(Object event) { + // no-op + } + + @Override + public void flush() { + // no-op + } + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java new file mode 100644 index 0000000..52ff210 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import java.util.concurrent.locks.LockSupport; +import org.apache.storm.utils.JCQueue; +import org.apache.storm.utils.MutableLong; + +class Consumer extends MyThread { + public final MutableLong counter = new MutableLong(0); + private final JCQueue queue; + + public Consumer(JCQueue queue) { + super("Consumer"); + this.queue = queue; + } + + @Override + public void run() { + Handler handler = new Handler(); + long start = System.currentTimeMillis(); + while (!Thread.interrupted()) { + int x = queue.consume(handler); + if (x == 0) { + LockSupport.parkNanos(1); + } + } + runTime = System.currentTimeMillis() - start; + } + + @Override + public long getCount() { + return counter.get(); + } + + private class Handler implements JCQueue.Consumer { + @Override + public void accept(Object event) { + counter.increment(); + } + + @Override + public void flush() { + // no-op + } + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java new file mode 100644 index 0000000..2f951dd --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import java.util.concurrent.locks.LockSupport; +import org.apache.storm.utils.JCQueue; +import org.apache.storm.utils.MutableLong; + +class Forwarder extends MyThread { + public final MutableLong counter = new MutableLong(0); + private final JCQueue inq; + private final JCQueue outq; + + public Forwarder(JCQueue inq, JCQueue outq) { + super("Forwarder"); + this.inq = inq; + this.outq = outq; + } + + @Override + public void run() { + Handler handler = new Handler(); + long start = System.currentTimeMillis(); + while (!Thread.interrupted()) { + int x = inq.consume(handler); + if (x == 0) { + LockSupport.parkNanos(1); + } + } + runTime = System.currentTimeMillis() - start; + } + + @Override + public long getCount() { + return counter.get(); + } + + private class Handler implements JCQueue.Consumer { + @Override + public void accept(Object event) { + try { + outq.publish(event); + counter.increment(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void flush() { + // no-op + } + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java similarity index 50% rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java rename to examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java index 6ba56a9..13d880b 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java @@ -16,7 +16,7 @@ * limitations under the License */ -package org.apache.storm.perf; +package org.apache.storm.perf.queuetest; import java.util.concurrent.locks.LockSupport; import org.apache.storm.metrics2.StormMetricRegistry; @@ -24,6 +24,7 @@ import org.apache.storm.policy.WaitStrategyPark; import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.MutableLong; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class JCQueuePerfTest { // Usage: Let it and then explicitly terminate. // Metrics will be printed when application is terminated. @@ -147,240 +148,4 @@ public class JCQueuePerfTest { })); } - -} - - -abstract class MyThread extends Thread { - public long count = 0; - public long runTime = 0; - - public MyThread(String thdName) { - super(thdName); - } - - public long throughput() { - return getCount() / (runTime / 1000); - } - - public long getCount() { - return count; - } -} - -class Producer extends MyThread { - private final JCQueue q; - - public Producer(JCQueue q) { - super("Producer"); - this.q = q; - } - - @Override - public void run() { - try { - long start = System.currentTimeMillis(); - while (!Thread.interrupted()) { - q.publish(++count); - } - runTime = System.currentTimeMillis() - start; - } catch (InterruptedException e) { - return; - } - } - -} - -// writes to two queues -class Producer2 extends MyThread { - private final JCQueue q1; - private final JCQueue q2; - - public Producer2(JCQueue q1, JCQueue q2) { - super("Producer2"); - this.q1 = q1; - this.q2 = q2; - } - - @Override - public void run() { - try { - long start = System.currentTimeMillis(); - while (!Thread.interrupted()) { - q1.publish(++count); - q2.publish(count); - } - runTime = System.currentTimeMillis() - start; - } catch (InterruptedException e) { - return; - } - - } } - - -// writes to two queues -class AckingProducer extends MyThread { - private final JCQueue ackerInQ; - private final JCQueue spoutInQ; - - public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) { - super("AckingProducer"); - this.ackerInQ = ackerInQ; - this.spoutInQ = spoutInQ; - } - - @Override - public void run() { - try { - Handler handler = new Handler(); - long start = System.currentTimeMillis(); - while (!Thread.interrupted()) { - int x = spoutInQ.consume(handler); - ackerInQ.publish(count); - } - runTime = System.currentTimeMillis() - start; - } catch (InterruptedException e) { - return; - } - } - - private class Handler implements JCQueue.Consumer { - @Override - public void accept(Object event) { - // no-op - } - - @Override - public void flush() { - // no-op - } - } -} - -// reads from ackerInQ and writes to spout queue -class Acker extends MyThread { - private final JCQueue ackerInQ; - private final JCQueue spoutInQ; - - public Acker(JCQueue ackerInQ, JCQueue spoutInQ) { - super("Acker"); - this.ackerInQ = ackerInQ; - this.spoutInQ = spoutInQ; - } - - - @Override - public void run() { - long start = System.currentTimeMillis(); - Handler handler = new Handler(); - while (!Thread.interrupted()) { - ackerInQ.consume(handler); - } - runTime = System.currentTimeMillis() - start; - } - - private class Handler implements JCQueue.Consumer { - @Override - public void accept(Object event) { - try { - spoutInQ.publish(event); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void flush() throws InterruptedException { - spoutInQ.flush(); - } - } -} - -class Consumer extends MyThread { - public final MutableLong counter = new MutableLong(0); - private final JCQueue q; - - public Consumer(JCQueue q) { - super("Consumer"); - this.q = q; - } - - @Override - public void run() { - Handler handler = new Handler(); - long start = System.currentTimeMillis(); - while (!Thread.interrupted()) { - int x = q.consume(handler); - if (x == 0) { - LockSupport.parkNanos(1); - } - } - runTime = System.currentTimeMillis() - start; - } - - @Override - public long getCount() { - return counter.get(); - } - - private class Handler implements JCQueue.Consumer { - @Override - public void accept(Object event) { - counter.increment(); - } - - @Override - public void flush() { - // no-op - } - } -} - - -class Forwarder extends MyThread { - public final MutableLong counter = new MutableLong(0); - private final JCQueue inq; - private final JCQueue outq; - - public Forwarder(JCQueue inq, JCQueue outq) { - super("Forwarder"); - this.inq = inq; - this.outq = outq; - } - - @Override - public void run() { - Handler handler = new Handler(); - long start = System.currentTimeMillis(); - while (!Thread.interrupted()) { - int x = inq.consume(handler); - if (x == 0) { - LockSupport.parkNanos(1); - } - } - runTime = System.currentTimeMillis() - start; - } - - @Override - public long getCount() { - return counter.get(); - } - - private class Handler implements JCQueue.Consumer { - @Override - public void accept(Object event) { - try { - outq.publish(event); - counter.increment(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void flush() { - // no-op - } - } -} \ No newline at end of file diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java new file mode 100644 index 0000000..4c84316 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +abstract class MyThread extends Thread { + public long count = 0; + public long runTime = 0; + + public MyThread(String thdName) { + super(thdName); + } + + public long throughput() { + return getCount() / (runTime / 1000); + } + + public long getCount() { + return count; + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java new file mode 100644 index 0000000..eb02d5e --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import org.apache.storm.utils.JCQueue; + +class Producer extends MyThread { + private final JCQueue queue; + + public Producer(JCQueue queue) { + super("Producer"); + this.queue = queue; + } + + @Override + public void run() { + try { + long start = System.currentTimeMillis(); + while (!Thread.interrupted()) { + queue.publish(++count); + } + runTime = System.currentTimeMillis() - start; + } catch (InterruptedException e) { + return; + } + } + +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java new file mode 100644 index 0000000..8df519b --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.queuetest; + +import org.apache.storm.utils.JCQueue; + +/** + * Writes to two queues. + */ +class Producer2 extends MyThread { + private final JCQueue q1; + private final JCQueue q2; + + public Producer2(JCQueue q1, JCQueue q2) { + super("Producer2"); + this.q1 = q1; + this.q2 = q2; + } + + @Override + public void run() { + try { + long start = System.currentTimeMillis(); + while (!Thread.interrupted()) { + q1.publish(++count); + q2.publish(count); + } + runTime = System.currentTimeMillis() - start; + } catch (InterruptedException e) { + return; + } + + } +} \ No newline at end of file diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java index f46d6f3..a6a1fc9 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java @@ -63,8 +63,9 @@ public class WordGenSpout extends BaseRichSpout { try { String line; while ((line = reader.readLine()) != null) { - for (String word : line.split("\\s+")) + for (String word : line.split("\\s+")) { lines.add(word); + } } } catch (IOException e) { throw new RuntimeException("Reading file failed", e); diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java new file mode 100644 index 0000000..3783310 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.toolstest; + +import java.util.concurrent.locks.LockSupport; +import org.apache.storm.utils.MutableLong; +import org.jctools.queues.MpscArrayQueue; + +class Cons extends MyThd { + public final MutableLong counter = new MutableLong(0); + private final MpscArrayQueue<Object> queue; + + public Cons(MpscArrayQueue<Object> queue) { + super("Consumer"); + this.queue = queue; + } + + @Override + public void run() { + Handler handler = new Handler(); + long start = System.currentTimeMillis(); + + while (!halt) { + int x = queue.drain(handler); + if (x == 0) { + LockSupport.parkNanos(1); + } else { + counter.increment(); + } + } + runTime = System.currentTimeMillis() - start; + } + + @Override + public long getCount() { + return counter.get(); + } + + private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> { + @Override + public void accept(Object event) { + counter.increment(); + } + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java similarity index 60% rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java rename to examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java index a439522..b5afe28 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java @@ -16,12 +16,11 @@ * limitations under the License */ -package org.apache.storm.perf; +package org.apache.storm.perf.toolstest; -import java.util.concurrent.locks.LockSupport; -import org.apache.storm.utils.MutableLong; import org.jctools.queues.MpscArrayQueue; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class JCToolsPerfTest { public static void main(String[] args) throws Exception { // oneProducer1Consumer(); @@ -120,107 +119,4 @@ public class JCToolsPerfTest { } -abstract class MyThd extends Thread { - public long count = 0; - public long runTime = 0; - public boolean halt = false; - public MyThd(String thdName) { - super(thdName); - } - - public long throughput() { - return getCount() / (runTime / 1000); - } - - public long getCount() { - return count; - } -} - -class Prod extends MyThd { - private final MpscArrayQueue<Object> q; - - public Prod(MpscArrayQueue<Object> q) { - super("Producer"); - this.q = q; - } - - @Override - public void run() { - long start = System.currentTimeMillis(); - - while (!halt) { - ++count; - while (!q.offer(count)) { - if (Thread.interrupted()) { - return; - } - } - } - runTime = System.currentTimeMillis() - start; - } - -} - -// writes to two queues -class Prod2 extends MyThd { - private final MpscArrayQueue<Object> q1; - private final MpscArrayQueue<Object> q2; - - public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) { - super("Producer2"); - this.q1 = q1; - this.q2 = q2; - } - - @Override - public void run() { - long start = System.currentTimeMillis(); - - while (!halt) { - q1.offer(++count); - q2.offer(count); - } - runTime = System.currentTimeMillis() - start; - } -} - - -class Cons extends MyThd { - public final MutableLong counter = new MutableLong(0); - private final MpscArrayQueue<Object> q; - - public Cons(MpscArrayQueue<Object> q) { - super("Consumer"); - this.q = q; - } - - @Override - public void run() { - Handler handler = new Handler(); - long start = System.currentTimeMillis(); - - while (!halt) { - int x = q.drain(handler); - if (x == 0) { - LockSupport.parkNanos(1); - } else { - counter.increment(); - } - } - runTime = System.currentTimeMillis() - start; - } - - @Override - public long getCount() { - return counter.get(); - } - - private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> { - @Override - public void accept(Object event) { - counter.increment(); - } - } -} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java new file mode 100644 index 0000000..fbd7aab --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.toolstest; + +abstract class MyThd extends Thread { + public long count = 0; + public long runTime = 0; + public boolean halt = false; + + public MyThd(String thdName) { + super(thdName); + } + + public long throughput() { + return getCount() / (runTime / 1000); + } + + public long getCount() { + return count; + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java new file mode 100644 index 0000000..3553552 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.toolstest; + +import org.jctools.queues.MpscArrayQueue; + +class Prod extends MyThd { + private final MpscArrayQueue<Object> queue; + + public Prod(MpscArrayQueue<Object> queue) { + super("Producer"); + this.queue = queue; + } + + @Override + public void run() { + long start = System.currentTimeMillis(); + + while (!halt) { + ++count; + while (!queue.offer(count)) { + if (interrupted()) { + return; + } + } + } + runTime = System.currentTimeMillis() - start; + } + +} \ No newline at end of file diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java new file mode 100644 index 0000000..470522a --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.toolstest; + +import org.jctools.queues.MpscArrayQueue; + +/** + * Writes to two queues. + */ +class Prod2 extends MyThd { + private final MpscArrayQueue<Object> q1; + private final MpscArrayQueue<Object> q2; + + public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) { + super("Producer2"); + this.q1 = q1; + this.q2 = q2; + } + + @Override + public void run() { + long start = System.currentTimeMillis(); + + while (!halt) { + q1.offer(++count); + q2.offer(count); + } + runTime = System.currentTimeMillis() - start; + } +} diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java index b3a206c..49be70e 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java @@ -68,7 +68,7 @@ public class Helper { } /** - * Kill topo on Ctrl-C + * Kill topo on Ctrl-C. */ public static void setupShutdownHook(final String topoName) { Map<String, Object> clusterConf = Utils.readStormConfig(); diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java index b70d06a..02e223e 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java @@ -82,8 +82,6 @@ public class MetricsSample { // number of spout executors int spoutExecCount = 0; double spoutLatencySum = 0.0; - - long spoutEmitted = 0L; long spoutTransferred = 0L; // Executor summaries @@ -156,6 +154,8 @@ public class MetricsSample { ret.totalAcked = totalAcked; ret.totalFailed = totalFailed; ret.totalLatency = spoutLatencySum / spoutExecCount; + + long spoutEmitted = 0L; ret.spoutEmitted = spoutEmitted; ret.spoutTransferred = spoutTransferred; ret.sampleTime = System.currentTimeMillis();