[FLINK-6695] Activate strict checkstyle for flink-storm-examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a2d984f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a2d984f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a2d984f Branch: refs/heads/master Commit: 2a2d984f7258e62eff34b4deba3f803529553227 Parents: 40cb093 Author: zentol <ches...@apache.org> Authored: Tue May 23 23:47:00 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Jun 1 11:14:11 2017 +0200 ---------------------------------------------------------------------- flink-contrib/flink-storm-examples/pom.xml | 35 ++++++++++++++++++++ .../storm/exclamation/ExclamationLocal.java | 25 +++++++------- .../storm/exclamation/ExclamationTopology.java | 25 +++++++------- .../storm/exclamation/ExclamationWithBolt.java | 15 +++++---- .../storm/exclamation/ExclamationWithSpout.java | 15 +++++---- .../exclamation/operators/ExclamationBolt.java | 8 +++-- .../flink/storm/join/SingleJoinExample.java | 17 +++++----- .../flink/storm/print/PrintSampleStream.java | 14 ++++---- .../flink/storm/split/SpoutSplitExample.java | 11 +++--- .../storm/split/operators/RandomSpout.java | 10 ++++-- .../split/operators/VerifyAndEnrichBolt.java | 9 +++-- .../flink/storm/util/AbstractBoltSink.java | 4 +-- .../flink/storm/util/AbstractLineSpout.java | 2 +- .../org/apache/flink/storm/util/FileSpout.java | 6 ++-- .../flink/storm/util/FiniteFileSpout.java | 1 + .../flink/storm/util/OutputFormatter.java | 8 +++-- .../flink/storm/util/SimpleOutputFormatter.java | 6 +++- .../flink/storm/util/TupleOutputFormatter.java | 4 +++ .../storm/wordcount/BoltTokenizerWordCount.java | 15 +++++---- .../wordcount/BoltTokenizerWordCountPojo.java | 15 +++++---- .../BoltTokenizerWordCountWithNames.java | 19 ++++++----- .../storm/wordcount/SpoutSourceWordCount.java | 18 +++++----- .../flink/storm/wordcount/WordCountLocal.java | 26 +++++++-------- .../storm/wordcount/WordCountLocalByName.java | 26 +++++++-------- .../wordcount/WordCountRemoteByClient.java | 28 ++++++++-------- .../wordcount/WordCountRemoteBySubmitter.java | 28 ++++++++-------- .../storm/wordcount/WordCountTopology.java | 30 ++++++++--------- .../storm/wordcount/operators/BoltCounter.java | 4 +-- .../wordcount/operators/BoltCounterByName.java | 4 +-- .../wordcount/operators/BoltTokenizer.java | 4 +-- .../operators/BoltTokenizerByName.java | 4 +-- .../wordcount/operators/WordCountDataPojos.java | 12 +++++-- .../wordcount/operators/WordCountDataTuple.java | 5 ++- .../operators/WordCountInMemorySpout.java | 5 +-- .../exclamation/ExclamationWithBoltITCase.java | 3 ++ .../exclamation/ExclamationWithSpoutITCase.java | 3 ++ .../StormExclamationLocalITCase.java | 3 ++ .../storm/exclamation/util/ExclamationData.java | 3 ++ .../flink/storm/join/SingleJoinITCase.java | 8 +++-- .../org/apache/flink/storm/split/SplitBolt.java | 8 +++-- .../flink/storm/split/SplitBoltTopology.java | 19 +++++++---- .../apache/flink/storm/split/SplitITCase.java | 11 ++++-- .../flink/storm/split/SplitSpoutTopology.java | 18 ++++++---- .../flink/storm/split/SplitStreamBoltLocal.java | 11 ++++-- .../storm/split/SplitStreamSpoutLocal.java | 11 ++++-- .../storm/tests/StormFieldsGroupingITCase.java | 24 ++++++++------ .../flink/storm/tests/StormMetaDataITCase.java | 18 ++++++---- .../flink/storm/tests/StormUnionITCase.java | 22 +++++++----- .../tests/operators/FiniteRandomSpout.java | 11 ++++-- .../flink/storm/tests/operators/MergerBolt.java | 8 +++-- .../storm/tests/operators/MetaDataSpout.java | 8 +++-- .../flink/storm/tests/operators/TaskIdBolt.java | 5 +-- .../tests/operators/VerifyMetaDataBolt.java | 8 +++-- .../wordcount/BoltTokenizerWordCountITCase.java | 3 ++ .../BoltTokenizerWordCountPojoITCase.java | 3 ++ .../BoltTokenizerWordCountWithNamesITCase.java | 3 ++ .../wordcount/SpoutSourceWordCountITCase.java | 3 ++ .../storm/wordcount/WordCountLocalITCase.java | 3 ++ .../wordcount/WordCountLocalNamedITCase.java | 4 ++- 59 files changed, 425 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml index 6ef0f7b..0296ff3 100644 --- a/flink-contrib/flink-storm-examples/pom.xml +++ b/flink-contrib/flink-storm-examples/pom.xml @@ -384,6 +384,41 @@ under the License. </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>/tools/maven/strict-checkstyle.xml</configLocation> + <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <executions> + <!-- + Execute checkstyle after compilation but before tests. + + This ensures that any parsing or type checking errors are from + javac, so they look as expected. Beyond that, we want to + fail as early as possible. + --> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java index c37ae65..6108f79 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java @@ -17,34 +17,35 @@ package org.apache.flink.storm.exclamation; -import org.apache.storm.Config; -import org.apache.storm.topology.TopologyBuilder; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.exclamation.operators.ExclamationBolt; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; + /** * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology} and submitted to * Flink for execution in the same way as to a Storm {@link org.apache.storm.LocalCluster}. - * <p> - * This example shows how to run program directly within Java, thus it cannot be used to submit a + * + * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a * {@link org.apache.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>ExclamationLocal <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>ExclamationLocal <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>run a regular Storm program locally on Flink</li> * </ul> */ public class ExclamationLocal { - public final static String topologyId = "Streaming Exclamation"; + public static final String TOPOLOGY_ID = "Streaming Exclamation"; // ************************************************************************* // PROGRAM @@ -65,7 +66,7 @@ public class ExclamationLocal { conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); + cluster.submitTopology(TOPOLOGY_ID, conf, FlinkTopology.createTopology(builder)); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java index 0144acb..51edd1f 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java @@ -17,7 +17,6 @@ package org.apache.flink.storm.exclamation; -import org.apache.storm.topology.TopologyBuilder; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.exclamation.operators.ExclamationBolt; import org.apache.flink.storm.util.BoltFileSink; @@ -27,17 +26,19 @@ import org.apache.flink.storm.util.FiniteInMemorySpout; import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.SimpleOutputFormatter; +import org.apache.storm.topology.TopologyBuilder; + /** * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path> * <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>construct a regular Storm topology as Flink program</li> * <li>make use of the FiniteSpout interface</li> @@ -45,11 +46,11 @@ import org.apache.flink.storm.util.SimpleOutputFormatter; */ public class ExclamationTopology { - public final static String spoutId = "source"; - public final static String firstBoltId = "exclamation1"; - public final static String secondBoltId = "exclamation2"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new SimpleOutputFormatter(); + private static final String spoutId = "source"; + private static final String firstBoltId = "exclamation1"; + private static final String secondBoltId = "exclamation2"; + private static final String sinkId = "sink"; + private static final OutputFormatter formatter = new SimpleOutputFormatter(); public static TopologyBuilder buildTopology() { final TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java index 5a79119..a838e69 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java @@ -18,7 +18,6 @@ package org.apache.flink.storm.exclamation; -import org.apache.storm.utils.Utils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.examples.java.wordcount.util.WordCountData; @@ -28,17 +27,19 @@ import org.apache.flink.storm.wrappers.BoltWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.storm.utils.Utils; + /** * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: * <code>ExclamationWithmBolt <text path> <result path> <number of exclamation marks></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>use a Bolt within a Flink Streaming program</li> * <li>how to configure a Bolt using StormConfig</li> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java index 237f1d4..b165f00 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java @@ -18,7 +18,6 @@ package org.apache.flink.storm.exclamation; -import org.apache.storm.utils.Utils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.examples.java.wordcount.util.WordCountData; @@ -29,16 +28,18 @@ import org.apache.flink.storm.wrappers.SpoutWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.storm.utils.Utils; + /** * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>ExclamationWithSpout <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>ExclamationWithSpout <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>use a Storm spout within a Flink Streaming program</li> * <li>make use of the FiniteSpout interface</li> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java index 77a91d2..8872acd 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java @@ -28,10 +28,14 @@ import org.apache.storm.tuple.Values; import java.util.Map; +/** + * A Bolt implementation that appends exclamation marks to incoming tuples. The number of added exclamation marks can + * be controlled by setting <code>exclamation.count</code>. + */ public class ExclamationBolt implements IRichBolt { - private final static long serialVersionUID = -6364882114201311380L; + private static final long serialVersionUID = -6364882114201311380L; - public final static String EXCLAMATION_COUNT = "exclamation.count"; + public static final String EXCLAMATION_COUNT = "exclamation.count"; private OutputCollector collector; private String exclamation; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java index 41ea4cb..b2ad05f 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java @@ -15,13 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.join; -import org.apache.storm.Config; -import org.apache.storm.testing.FeederSpout; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; +package org.apache.flink.storm.join; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; @@ -29,10 +24,17 @@ import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.storm.util.NullTerminatingSpout; import org.apache.flink.storm.util.TupleOutputFormatter; +import org.apache.storm.Config; import org.apache.storm.starter.bolt.PrinterBolt; import org.apache.storm.starter.bolt.SingleJoinBolt; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; - +/** + * Implements a simple example where 2 input streams are being joined. + */ public class SingleJoinExample { public static void main(String[] args) throws Exception { @@ -79,7 +81,6 @@ public class SingleJoinExample { ageSpout.feed(new Values(i, i + 20)); } - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder)); cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java index da2e641..6157e2c 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java @@ -18,13 +18,14 @@ package org.apache.flink.storm.print; -import org.apache.storm.Config; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; + +import org.apache.storm.Config; import org.apache.storm.starter.bolt.PrinterBolt; import org.apache.storm.starter.spout.TwitterSampleSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; import java.util.Arrays; @@ -32,15 +33,15 @@ import java.util.Arrays; * Prints incoming tweets. Tweets can be filtered by keywords. */ public class PrintSampleStream { - + public static void main(String[] args) throws Exception { - + if (args.length < 4) { System.err.println( "Usage: PrintSampleStream <consumer-key> <consumer-secret> <access-token> <access-token-secret>"); return; } - + String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; @@ -56,7 +57,6 @@ public class PrintSampleStream { builder.setBolt("print", new PrinterBolt()) .shuffleGrouping("twitter"); - Config conf = new Config(); final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java index 02131fc..c5bb5c3 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.split; import org.apache.flink.api.common.functions.MapFunction; @@ -33,15 +34,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Implements a simple example with two declared output streams for the embedded spout. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>handle multiple output stream of a spout</li> * <li>accessing each stream by .split(...) and .select(...)</li> * <li>strip wrapper data type SplitStreamType for further processing in Flink</li> * </ul> - * <p> - * This example would work the same way for multiple bolt output streams. + * + * <p>This example would work the same way for multiple bolt output streams. */ public class SpoutSplitExample { @@ -94,7 +95,7 @@ public class SpoutSplitExample { /** * Same as {@link VerifyAndEnrichBolt}. */ - public final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> { + public static final class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> { private static final long serialVersionUID = 5213888269197438892L; private final Tuple2<String, Integer> out; private final boolean isEven; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java index 5fbe0a7..afec47f 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java @@ -15,10 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.split.operators; -import java.util.Map; -import java.util.Random; +package org.apache.flink.storm.split.operators; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -27,6 +25,12 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import java.util.Map; +import java.util.Random; + +/** + * A Spout implementation that emits random numbers, optionally splitting them into odd/even streams. + */ public class RandomSpout extends BaseRichSpout { private static final long serialVersionUID = -3978554318742509334L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java index 1ad9a6c..a39ec9c 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java @@ -15,9 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.split.operators; -import java.util.Map; +package org.apache.flink.storm.split.operators; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -27,6 +26,12 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import java.util.Map; + +/** + * Verifies that incoming numbers are either even or odd, controlled by the constructor argument. Emitted tuples are + * enriched with a new string field containing either "even" or "odd", based on the number's parity. + */ public class VerifyAndEnrichBolt extends BaseRichBolt { private static final long serialVersionUID = -7277395570966328721L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java index 2cb346a..5ae8cfb 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java @@ -50,7 +50,7 @@ public abstract class AbstractBoltSink implements IRichBolt { } } - protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context); + protected abstract void prepareSimple(Map<?, ?> stormConf, TopologyContext context); @Override public final void execute(final Tuple input) { @@ -60,7 +60,7 @@ public abstract class AbstractBoltSink implements IRichBolt { this.writeExternal(this.lineBuilder.toString()); } - protected abstract void writeExternal(final String line); + protected abstract void writeExternal(String line); @Override public void cleanup() {/* nothing to do */} http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java index 29df23e..caefd56 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java @@ -32,7 +32,7 @@ import java.util.Map; public abstract class AbstractLineSpout implements IRichSpout { private static final long serialVersionUID = 8876828403487806771L; - public final static String ATTRIBUTE_LINE = "line"; + public static final String ATTRIBUTE_LINE = "line"; protected SpoutOutputCollector collector; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java index 0a295e7..0533b09 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java @@ -33,7 +33,7 @@ import java.util.Map; public class FileSpout extends AbstractLineSpout { private static final long serialVersionUID = -6996907090003590436L; - public final static String INPUT_FILE_PATH = "input.path"; + public static final String INPUT_FILE_PATH = "input.path"; protected String path = null; protected BufferedReader reader; @@ -50,8 +50,8 @@ public class FileSpout extends AbstractLineSpout { super.open(conf, context, collector); Object configuredPath = conf.get(INPUT_FILE_PATH); - if(configuredPath != null) { - this.path = (String)configuredPath; + if (configuredPath != null) { + this.path = (String) configuredPath; } try { http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java index 48349c2..e4f39ab 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java index fe28afc..a0f933f 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java @@ -22,16 +22,20 @@ import org.apache.storm.tuple.Tuple; import java.io.Serializable; +/** + * Interface that is used to convert Storm {@link Tuple Tuples} to a string before writing them out to a file or to the + * console. + */ public interface OutputFormatter extends Serializable { /** * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output tuples before writing * them out to a file or to the console. - * + * * @param input * The tuple to be formatted * @return The string result of the formatting */ - public String format(Tuple input); + String format(Tuple input); } http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java index 323fb53..bf30cd2 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java @@ -20,13 +20,17 @@ package org.apache.flink.storm.util; import org.apache.storm.tuple.Tuple; +/** + * Simple {@link OutputFormatter} implementation to convert {@link Tuple Tuples} with a size of 1 by returning the + * result of {@link Object#toString()} for the first field. + */ public class SimpleOutputFormatter implements OutputFormatter { private static final long serialVersionUID = 6349573860144270338L; /** * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that field. This method is * used for formatting raw outputs wrapped in tuples, before writing them out to a file or to the console. - * + * * @param input * The tuple to be formatted * @return The string result of the formatting http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java index 11d23cd..42189a7 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java @@ -20,6 +20,10 @@ package org.apache.flink.storm.util; import org.apache.storm.tuple.Tuple; +/** + * {@link OutputFormatter} implementation that converts {@link Tuple Tuples} of arbitrary size to a string. For a given + * tuple the output is <code>(field1,field2,...,fieldX)</code>. + */ public class TupleOutputFormatter implements OutputFormatter { private static final long serialVersionUID = -599665757723851761L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java index 4620d9d..6f7addf 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java @@ -17,7 +17,6 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.topology.IRichBolt; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.examples.java.wordcount.util.WordCountData; @@ -26,16 +25,18 @@ import org.apache.flink.storm.wrappers.BoltWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.storm.topology.IRichBolt; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCount <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>use a Bolt within a Flink Streaming program.</li> * </ul> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java index eefbf78..125a044 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java @@ -17,7 +17,6 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.topology.IRichBolt; import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.PojoCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple2; @@ -32,17 +31,19 @@ import org.apache.flink.storm.wrappers.BoltWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.storm.topology.IRichBolt; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount} * the tokenizer's input is a POJO type and the single field is accessed by name. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCount <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>how to access attributes by name within a Bolt for POJO type input streams * </ul> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java index 98f7f96..f469bab 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java @@ -17,8 +17,6 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.tuple.Fields; import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple; @@ -34,17 +32,20 @@ import org.apache.flink.storm.wrappers.BoltWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Fields; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount} * the tokenizer's input is a {@link Tuple} type and the single field is accessed by name. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCount <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>how to access attributes by name within a Bolt for {@link Tuple} type input streams * </ul> @@ -120,7 +121,7 @@ public class BoltTokenizerWordCountWithNames { private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) { if (fileOutput) { // read the text file from given input path - TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor + TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>) TypeExtractor .getForObject(new Tuple1<String>("")); return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path( textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java index 683a3b5..f0cfd7a 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java @@ -17,9 +17,6 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.utils.Utils; - import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -31,16 +28,19 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.utils.Utils; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The used data source is a {@link IRichSpout Spout}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCount <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>use a Spout within a Flink Streaming program.</li> * </ul> http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java index ee880ba..82c8ae3 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java @@ -17,35 +17,35 @@ package org.apache.flink.storm.wordcount; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; + import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopology; - /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the * same way as to a Storm {@link LocalCluster}. - * <p> - * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} + * + * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} * via Flink command line clients (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCount <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>run a regular Storm program locally on Flink</li> * </ul> */ public class WordCountLocal { - public final static String topologyId = "Storm WordCount"; + private static final String topologyId = "Storm WordCount"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java index ab423cf..b960b79 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java @@ -17,36 +17,36 @@ package org.apache.flink.storm.wordcount; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; + import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopology; - /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the * same way as to a Storm {@link LocalCluster}. In contrast to {@link WordCountLocal} all bolts access the field of * input tuples by name instead of index. - * <p> - * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} + * + * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} * via Flink command line clients (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCountLocalByName <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCountLocalByName <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>run a regular Storm program locally on Flink * </ul> */ public class WordCountLocalByName { - public final static String topologyId = "Storm WordCountName"; + private static final String topologyId = "Storm WordCountName"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java index 5c99f93..8dff6d7 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java @@ -17,6 +17,10 @@ package org.apache.flink.storm.wordcount; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkClient; +import org.apache.flink.storm.api.FlinkTopology; + import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.InvalidTopologyException; @@ -26,31 +30,27 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.NimbusClient; import org.apache.storm.utils.Utils; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.api.FlinkClient; -import org.apache.flink.storm.api.FlinkTopology; - /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote. - * <p> - * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via + * + * <p>This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via * Flink command line clients (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCountRemoteByClient <text path> <result path></code><br> + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCountRemoteByClient <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>submit a regular Storm program to a local or remote Flink cluster.</li> * </ul> */ public class WordCountRemoteByClient { - public final static String topologyId = "Storm WordCount"; - private final static String uploadedJarLocation = "WordCount-StormTopology.jar"; + private static final String topologyId = "Storm WordCount"; + private static final String uploadedJarLocation = "WordCount-StormTopology.jar"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java index 08ba52a..745ec85 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java @@ -17,35 +17,35 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; - -import org.apache.storm.topology.TopologyBuilder; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.api.FlinkClient; import org.apache.flink.storm.api.FlinkSubmitter; import org.apache.flink.storm.api.FlinkTopology; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote. - * <p> - * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>WordCountRemoteBySubmitter <text path> <result path></code><br> + * + * <p>This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: <code>WordCountRemoteBySubmitter <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>submit a regular Storm program to a local or remote Flink cluster.</li> * </ul> */ public class WordCountRemoteBySubmitter { - public final static String topologyId = "Storm WordCount"; + private static final String topologyId = "Storm WordCount"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java index 8f855b5..8627145 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java @@ -17,10 +17,6 @@ package org.apache.flink.storm.wordcount; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.storm.util.BoltPrintSink; @@ -34,27 +30,31 @@ import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName; import org.apache.flink.storm.wordcount.operators.WordCountFileSpout; import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming * fashion. The program is constructed as a regular {@link StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: + * + * <p>The input is a plain text file with lines separated by newline characters. + * + * <p>Usage: * <code>WordCount[Local|LocalByName|RemoteByClient|RemoteBySubmitter] <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: + * + * <p>This example shows how to: * <ul> * <li>how to construct a regular Storm topology as Flink program</li> * </ul> */ public class WordCountTopology { - public final static String spoutId = "source"; - public final static String tokenierzerId = "tokenizer"; - public final static String counterId = "counter"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new TupleOutputFormatter(); + private static final String spoutId = "source"; + private static final String tokenierzerId = "tokenizer"; + private static final String counterId = "counter"; + private static final String sinkId = "sink"; + private static final OutputFormatter formatter = new TupleOutputFormatter(); public static TopologyBuilder buildTopology() { return buildTopology(true); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java index 4a00869..34fc703 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java @@ -32,8 +32,8 @@ import java.util.Map; * Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple * schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: * {@code <String,Integer>} ). - * <p> - * Same as {@link BoltCounterByName}, but accesses input attribute by index (instead of name). + * + * <p>Same as {@link BoltCounterByName}, but accesses input attribute by index (instead of name). */ public class BoltCounter implements IRichBolt { private static final long serialVersionUID = 399619605462625934L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java index e3e0d58..cd53d50 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java @@ -32,8 +32,8 @@ import java.util.Map; * Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple * schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: * {@code <String,Integer>} ). - * <p> - * Same as {@link BoltCounter}, but accesses input attribute by name (instead of index). + * + * <p>Same as {@link BoltCounter}, but accesses input attribute by name (instead of index). */ public class BoltCounterByName implements IRichBolt { private static final long serialVersionUID = 399619605462625934L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java index cedd90a..41e8a8d 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java @@ -31,8 +31,8 @@ import java.util.Map; * Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: * {@code <String,Integer>}). - * <p> - * Same as {@link BoltTokenizerByName}, but accesses input attribute by index (instead of name). + * + * <p>Same as {@link BoltTokenizerByName}, but accesses input attribute by index (instead of name). */ public final class BoltTokenizer implements IRichBolt { private static final long serialVersionUID = -8589620297208175149L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java index 258d412..dff39eb 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java @@ -31,8 +31,8 @@ import java.util.Map; * Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: * {@code <String,Integer>}). - * <p> - * Same as {@link BoltTokenizer}, but accesses input attribute by name (instead of index). + * + * <p>Same as {@link BoltTokenizer}, but accesses input attribute by name (instead of index). */ public final class BoltTokenizerByName implements IRichBolt { private static final long serialVersionUID = -8589620297208175149L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java index 3a8fd3a..d63974b 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java @@ -17,12 +17,15 @@ package org.apache.flink.storm.wordcount.operators; -import java.io.Serializable; - import org.apache.flink.examples.java.wordcount.util.WordCountData; +import java.io.Serializable; + +/** + * Input POJOs for WordCount programs. + */ public class WordCountDataPojos { - public static Sentence[] SENTENCES; + public static final Sentence[] SENTENCES; static { SENTENCES = new Sentence[WordCountData.WORDS.length]; @@ -31,6 +34,9 @@ public class WordCountDataPojos { } } + /** + * Simple POJO containing a string. + */ public static class Sentence implements Serializable { private static final long serialVersionUID = -7336372859203407522L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java index 16e2ba0..d01d9a2 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java @@ -20,9 +20,12 @@ package org.apache.flink.storm.wordcount.operators; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.examples.java.wordcount.util.WordCountData; +/** + * Input tuples for WordCount programs. + */ @SuppressWarnings("unchecked") public class WordCountDataTuple { - public static Tuple1<String>[] TUPLES; + public static final Tuple1<String>[] TUPLES; static { TUPLES = new Tuple1[WordCountData.WORDS.length]; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java index 7bf40c2..be376a9 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java @@ -17,11 +17,12 @@ package org.apache.flink.storm.wordcount.operators; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.util.FiniteInMemorySpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; + /** * Implements a Spout that reads data from {@link WordCountData#WORDS}. */ http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java index 5a37572..358919f 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java @@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +/** + * Test for the ExclamationWithBolt example. + */ public class ExclamationWithBoltITCase extends StreamingProgramTestBase { protected String textPath; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java index c2b0467..61310e8 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java @@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +/** + * Test for the ExclamantionWithSpout example. + */ public class ExclamationWithSpoutITCase extends StreamingProgramTestBase { protected String textPath; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java index 049c881..bc09a3d 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java @@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +/** + * Test for the ExclamationLocal example. + */ public class StormExclamationLocalITCase extends StreamingProgramTestBase { protected String textPath; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java index 3c435f9..f700009 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java @@ -18,6 +18,9 @@ package org.apache.flink.storm.exclamation.util; +/** + * Expected output of Exclamation programs. + */ public class ExclamationData { public static final String TEXT_WITH_EXCLAMATIONS = http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java index b51db2c..83531ba 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java @@ -18,12 +18,16 @@ package org.apache.flink.storm.join; -import com.google.common.base.Joiner; import org.apache.flink.streaming.util.StreamingProgramTestBase; +import com.google.common.base.Joiner; + +/** + * Test for the SingleJoin example. + */ public class SingleJoinITCase extends StreamingProgramTestBase { - protected static String expectedOutput[] = { + protected static String[] expectedOutput = { "(male,20)", "(female,21)", "(male,22)", http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java index 0fc1ba5..90ee795 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java @@ -15,9 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.split; -import java.util.Map; +package org.apache.flink.storm.split; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -27,6 +26,11 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import java.util.Map; + +/** + * A bolt for splitting an input stream containing numbers based on whether they are even or odd. + */ public class SplitBolt extends BaseRichBolt { private static final long serialVersionUID = -6627606934204267173L; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java index 04cfeed..c002840 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.split; -import org.apache.storm.topology.TopologyBuilder; import org.apache.flink.storm.split.operators.RandomSpout; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; import org.apache.flink.storm.util.BoltFileSink; @@ -25,13 +25,18 @@ import org.apache.flink.storm.util.BoltPrintSink; import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.TupleOutputFormatter; +import org.apache.storm.topology.TopologyBuilder; + +/** + * A simple topology that splits a stream of numbers based on their parity, and verifies the result. + */ public class SplitBoltTopology { - public final static String spoutId = "randomSource"; - public final static String boltId = "splitBolt"; - public final static String evenVerifierId = "evenVerifier"; - public final static String oddVerifierId = "oddVerifier"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new TupleOutputFormatter(); + private static final String spoutId = "randomSource"; + private static final String boltId = "splitBolt"; + private static final String evenVerifierId = "evenVerifier"; + private static final String oddVerifierId = "oddVerifier"; + private static final String sinkId = "sink"; + private static final OutputFormatter formatter = new TupleOutputFormatter(); public static TopologyBuilder buildTopology() { final TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java index 4da9708..d53493c 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java @@ -14,19 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.split; -import java.io.File; -import java.io.IOException; +package org.apache.flink.storm.split; import org.apache.flink.storm.split.SpoutSplitExample.Enrich; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; + +/** + * Tests for split examples. + */ public class SplitITCase extends StreamingMultipleProgramsTestBase { private String output; http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java index 8671d2e..aa92a95 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.split; -import org.apache.storm.topology.TopologyBuilder; import org.apache.flink.storm.split.operators.RandomSpout; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; import org.apache.flink.storm.util.BoltFileSink; @@ -25,12 +25,18 @@ import org.apache.flink.storm.util.BoltPrintSink; import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.TupleOutputFormatter; +import org.apache.storm.topology.TopologyBuilder; + +/** + * A simple topology similar to the {@link SplitBoltTopology}, except that the split streams are generated directly in + * a spout. + */ public class SplitSpoutTopology { - public final static String spoutId = "randomSplitSource"; - public final static String evenVerifierId = "evenVerifier"; - public final static String oddVerifierId = "oddVerifier"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new TupleOutputFormatter(); + private static final String spoutId = "randomSplitSource"; + private static final String evenVerifierId = "evenVerifier"; + private static final String oddVerifierId = "oddVerifier"; + private static final String sinkId = "sink"; + private static final OutputFormatter formatter = new TupleOutputFormatter(); public static TopologyBuilder buildTopology() { final TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java index 2cde11e..55c3bd3 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java @@ -14,15 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.split; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +/** + * An example using the {@link SplitBoltTopology}. + */ public class SplitStreamBoltLocal { - public final static String topologyId = "Bolt split stream example"; + private static final String topologyId = "Bolt split stream example"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java index be880d0..da6e574 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java @@ -14,15 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.split; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +/** + * An example using the {@link SplitSpoutTopology}. + */ public class SplitStreamSpoutLocal { - public final static String topologyId = "Spout split stream example"; + private static final String topologyId = "Spout split stream example"; // ************************************************************************* // PROGRAM http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index 581f7c1..c861c9e 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -15,11 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.tests; -import org.apache.storm.Config; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.FiniteRandomSpout; @@ -28,6 +26,10 @@ import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.util.MathUtils; + +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; import org.junit.Assert; import java.util.ArrayList; @@ -41,10 +43,10 @@ import java.util.List; */ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { - private final static String topologyId = "FieldsGrouping Test"; - private final static String spoutId = "spout"; - private final static String boltId = "bolt"; - private final static String sinkId = "sink"; + private static final String topologyId = "FieldsGrouping Test"; + private static final String spoutId = "spout"; + private static final String boltId = "bolt"; + private static final String sinkId = "sink"; private String resultPath; @Override @@ -62,19 +64,19 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { readAllResultLines(actualResults, resultPath, new String[0], false); //remove potential operator id prefix - for(int i = 0; i < actualResults.size(); ++i) { + for (int i = 0; i < actualResults.size(); ++i) { String s = actualResults.get(i); - if(s.contains(">")) { + if (s.contains(">")) { s = s.substring(s.indexOf(">") + 2); actualResults.set(i, s); } } - Assert.assertEquals(expectedResults.size(),actualResults.size()); + Assert.assertEquals(expectedResults.size(), actualResults.size()); Collections.sort(actualResults); Collections.sort(expectedResults); System.out.println(actualResults); - for(int i=0; i< actualResults.size(); ++i) { + for (int i = 0; i < actualResults.size(); ++i) { //compare against actual results with removed prefex (as it depends e.g. on the hash function used) Assert.assertEquals(expectedResults.get(i), actualResults.get(i)); }