[FLINK-2837][storm] various improvements for the compatibility layer - refactor to use Storm's topology builder - remove FlinkTopologyBuilder - instantiate context-based StreamExecutionEnvironment (local or remote) - remove some of the Flink and Storm behavior replicating classes - modify FlinkTopology to parse Storm topology directly - replace StormTestBase with StreamingTestBase - add print example - FlinkTopologyBuilder changes (check if all inputs are available before processing) - correct package typo - two input support - add join example - update docs
This closes #1398. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88636799 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88636799 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88636799 Branch: refs/heads/master Commit: 88636799ee68cae80b093436ff2e5eace0675b95 Parents: 20fe2af Author: Maximilian Michels <m...@apache.org> Authored: Thu Nov 12 14:39:45 2015 +0100 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Dec 2 10:39:20 2015 +0100 ---------------------------------------------------------------------- docs/apis/storm_compatibility.md | 12 +- flink-contrib/flink-storm-examples/pom.xml | 10 +- .../storm/excamation/ExclamationLocal.java | 72 --- .../storm/excamation/ExclamationTopology.java | 120 ----- .../storm/excamation/ExclamationWithBolt.java | 141 ------ .../storm/excamation/ExclamationWithSpout.java | 147 ------ .../excamation/operators/ExclamationBolt.java | 75 --- .../storm/exclamation/ExclamationLocal.java | 74 +++ .../storm/exclamation/ExclamationTopology.java | 120 +++++ .../storm/exclamation/ExclamationWithBolt.java | 140 ++++++ .../storm/exclamation/ExclamationWithSpout.java | 146 ++++++ .../exclamation/operators/ExclamationBolt.java | 75 +++ .../flink/storm/join/SingleJoinExample.java | 88 ++++ .../flink/storm/print/PrintSampleStream.java | 61 +++ .../flink/storm/util/FiniteFileSpout.java | 2 - .../flink/storm/util/FiniteInMemorySpout.java | 3 - .../storm/wordcount/BoltTokenizerWordCount.java | 1 - .../wordcount/BoltTokenizerWordCountPojo.java | 1 - .../BoltTokenizerWordCountWithNames.java | 1 - .../flink/storm/wordcount/WordCountLocal.java | 13 +- .../storm/wordcount/WordCountLocalByName.java | 13 +- .../wordcount/WordCountRemoteByClient.java | 7 +- .../wordcount/WordCountRemoteBySubmitter.java | 7 +- .../storm/wordcount/WordCountTopology.java | 11 +- .../operators/WordCountInMemorySpout.java | 7 +- .../exclamation/ExclamationWithBoltITCase.java | 5 +- .../exclamation/ExclamationWithSpoutITCase.java | 5 +- .../StormExclamationLocalITCase.java | 5 +- .../flink/storm/join/SingleJoinITCase.java | 58 +++ .../flink/storm/split/SplitBoltTopology.java | 8 +- .../flink/storm/split/SplitSpoutTopology.java | 8 +- .../flink/storm/split/SplitStreamBoltLocal.java | 16 +- .../storm/split/SplitStreamSpoutLocal.java | 16 +- .../storm/tests/StormFieldsGroupingITCase.java | 11 +- .../flink/storm/tests/operators/TaskIdBolt.java | 3 + .../apache/flink/storm/util/StormTestBase.java | 117 ----- .../wordcount/BoltTokenizerWordCountITCase.java | 5 +- .../BoltTokenizerWordCountPojoITCase.java | 5 +- .../BoltTokenizerWordCountWithNamesITCase.java | 5 +- .../wordcount/SpoutSourceWordCountITCase.java | 5 +- .../storm/wordcount/WordCountLocalITCase.java | 5 +- .../wordcount/WordCountLocalNamedITCase.java | 6 +- .../src/test/resources/log4j-test.properties | 2 +- .../org/apache/flink/storm/api/FlinkClient.java | 4 +- .../flink/storm/api/FlinkLocalCluster.java | 23 +- .../storm/api/FlinkOutputFieldsDeclarer.java | 1 - .../apache/flink/storm/api/FlinkTopology.java | 483 +++++++++++++++++-- .../flink/storm/api/FlinkTopologyBuilder.java | 421 ---------------- .../apache/flink/storm/util/StormConfig.java | 7 +- .../flink/storm/util/StormStreamSelector.java | 8 +- .../flink/storm/wrappers/BoltWrapper.java | 89 ++-- .../storm/wrappers/BoltWrapperTwoInput.java | 134 +++++ .../storm/wrappers/FlinkTopologyContext.java | 3 +- .../wrappers/SetupOutputFieldsDeclarer.java | 4 +- .../flink/storm/wrappers/SpoutCollector.java | 3 +- .../flink/storm/wrappers/SpoutWrapper.java | 8 +- .../apache/flink/storm/wrappers/StormTuple.java | 54 ++- .../storm/wrappers/WrapperSetupHelper.java | 23 +- .../api/FlinkOutputFieldsDeclarerTest.java | 2 - .../storm/api/FlinkTopologyBuilderTest.java | 81 ---- .../flink/storm/api/FlinkTopologyTest.java | 73 ++- .../org/apache/flink/storm/api/TestBolt.java | 4 +- .../org/apache/flink/storm/api/TestSpout.java | 4 +- .../flink/storm/api/TestTopologyBuilder.java | 29 -- .../storm/util/StormStreamSelectorTest.java | 6 +- .../apache/flink/storm/util/TestDummyBolt.java | 4 +- .../apache/flink/storm/util/TestDummySpout.java | 4 +- .../org/apache/flink/storm/util/TestSink.java | 8 +- .../flink/storm/wrappers/BoltCollectorTest.java | 2 - .../flink/storm/wrappers/BoltWrapperTest.java | 11 +- .../wrappers/FlinkTopologyContextTest.java | 6 +- .../wrappers/SetupOutputFieldsDeclarerTest.java | 2 - .../storm/wrappers/SpoutCollectorTest.java | 2 - .../flink/storm/wrappers/SpoutWrapperTest.java | 1 - .../flink/storm/wrappers/StormTupleTest.java | 37 +- .../storm/wrappers/WrapperSetupHelperTest.java | 36 +- 76 files changed, 1666 insertions(+), 1543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/docs/apis/storm_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md index 103b605..fe6bf35 100644 --- a/docs/apis/storm_compatibility.md +++ b/docs/apis/storm_compatibility.md @@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes: -- `TopologyBuilder` replaced by `FlinkTopologyBuilder` - `StormSubmitter` replaced by `FlinkSubmitter` - `NimbusClient` and `Client` replaced by `FlinkClient` - `LocalCluster` replaced by `FlinkLocalCluster` In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology. -The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*. -If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. -If a parameter is not specified, the value is taken from `flink-conf.yaml`. +The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*. +If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> ~~~java -FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology(); +TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder // actual topology assembling code and used Spouts/Bolts can be used as-is builder.setSpout("source", new FileSpout(inputFilePath)); @@ -81,12 +79,12 @@ builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("count Config conf = new Config(); if(runLocal) { // submit to test cluster FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("WordCount", conf, builder.createTopology()); + cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); } else { // submit to remote cluster // optional // conf.put(Config.NIMBUS_HOST, "remoteHost"); // conf.put(Config.NIMBUS_THRIFT_PORT, 6123); - FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); } ~~~ </div> http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml index cee6cac..6f3a050 100644 --- a/flink-contrib/flink-storm-examples/pom.xml +++ b/flink-contrib/flink-storm-examples/pom.xml @@ -61,6 +61,14 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-starter</artifactId> + <version>0.9.4</version> + </dependency> + + </dependencies> <build> @@ -226,7 +234,7 @@ under the License. </execution> <!-- WordCount Storm topology--> - <!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) --> + <!-- Example for whole topologies (ie, if FlinkTopology is used) --> <!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar. However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment. Thus, 'defaults.yaml' is not available for maven-jar-plugin. http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java deleted file mode 100644 index 56a0125..0000000 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.excamation; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; - -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; -import org.apache.flink.storm.excamation.operators.ExclamationBolt; - -/** - * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming - * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology} and submitted to - * Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}. - * <p> - * This example shows how to run program directly within Java, thus it cannot be used to submit a - * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink). - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>ExclamationLocal <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}. - * <p> - * This example shows how to: - * <ul> - * <li>run a regular Storm program locally on Flink</li> - * </ul> - */ -public class ExclamationLocal { - - public final static String topologyId = "Streaming Exclamation"; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!ExclamationTopology.parseParameters(args)) { - return; - } - - // build Topology the Storm way - final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology(); - - // execute program locally - Config conf = new Config(); - conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation()); - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, conf, builder.createTopology()); - - Utils.sleep(10 * 1000); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java deleted file mode 100644 index 9d94f5c..0000000 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.excamation; - -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.api.FlinkTopologyBuilder; -import org.apache.flink.storm.excamation.operators.ExclamationBolt; -import org.apache.flink.storm.util.FiniteFileSpout; -import org.apache.flink.storm.util.FiniteInMemorySpout; -import org.apache.flink.storm.util.OutputFormatter; -import org.apache.flink.storm.util.SimpleOutputFormatter; -import org.apache.flink.storm.util.BoltFileSink; -import org.apache.flink.storm.util.BoltPrintSink; - -/** - * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming - * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path> - * <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: - * <ul> - * <li>construct a regular Storm topology as Flink program</li> - * <li>make use of the FiniteSpout interface</li> - * </ul> - */ -public class ExclamationTopology { - - public final static String spoutId = "source"; - public final static String firstBoltId = "exclamation1"; - public final static String secondBoltId = "exclamation2"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new SimpleOutputFormatter(); - - public static FlinkTopologyBuilder buildTopology() { - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); - - // get input data - if (fileInputOutput) { - // read the text file from given input path - final String[] tokens = textPath.split(":"); - final String inputFile = tokens[tokens.length - 1]; - builder.setSpout(spoutId, new FiniteFileSpout(inputFile)); - } else { - builder.setSpout(spoutId, new FiniteInMemorySpout(WordCountData.WORDS)); - } - - builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId); - builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId); - - // emit result - if (fileInputOutput) { - // read the text file from given input path - final String[] tokens = outputPath.split(":"); - final String outputFile = tokens[tokens.length - 1]; - builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)) - .shuffleGrouping(secondBoltId); - } else { - builder.setBolt(sinkId, new BoltPrintSink(formatter), 4) - .shuffleGrouping(secondBoltId); - } - - return builder; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileInputOutput = false; - private static String textPath; - private static String outputPath; - private static int exclamationNum = 3; - - static int getExclamation() { - return exclamationNum; - } - - static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileInputOutput = true; - if (args.length == 3) { - textPath = args[0]; - outputPath = args[1]; - exclamationNum = Integer.parseInt(args[2]); - } else { - System.err.println("Usage: StormExclamation* <text path> <result path> <number of exclamation marks>"); - return false; - } - } else { - System.out.println("Executing StormExclamation example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: StormExclamation <text path> <result path> <number of exclamation marks>"); - } - - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java deleted file mode 100644 index 19fe977..0000000 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.excamation; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.excamation.operators.ExclamationBolt; -import org.apache.flink.storm.util.StormConfig; -import org.apache.flink.storm.wrappers.BoltWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import backtype.storm.utils.Utils; - -/** - * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming - * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: - * <code>ExclamationWithmBolt <text path> <result path> <number of exclamation marks></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2. - * <p> - * This example shows how to: - * <ul> - * <li>use a Bolt within a Flink Streaming program</li> - * <li>how to configure a Bolt using StormConfig</li> - * </ul> - */ -public class ExclamationWithBolt { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // set Storm configuration - StormConfig config = new StormConfig(); - config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum)); - env.getConfig().setGlobalJobParameters(config); - - // get input data - final DataStream<String> text = getTextDataStream(env); - - final DataStream<String> exclaimed = text - .transform("StormBoltTokenizer", - TypeExtractor.getForObject(""), - new BoltWrapper<String, String>(new ExclamationBolt(), - new String[] { Utils.DEFAULT_STREAM_ID })) - .map(new ExclamationMap()); - - // emit result - if (fileOutput) { - exclaimed.writeAsText(outputPath); - } else { - exclaimed.print(); - } - - // execute program - env.execute("Streaming WordCount with bolt tokenizer"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private static class ExclamationMap implements MapFunction<String, String> { - private static final long serialVersionUID = 4614754344067170619L; - - @Override - public String map(String value) throws Exception { - return value + "!!!"; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - private static int exclamationNum = 2; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 3) { - textPath = args[0]; - outputPath = args[1]; - exclamationNum = Integer.parseInt(args[2]); - } else { - System.err.println("Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>"); - return false; - } - } else { - System.out.println("Executing ExclamationWithBolt example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } - - return env.fromElements(WordCountData.WORDS); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java deleted file mode 100644 index a196995..0000000 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.excamation; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.util.FiniteFileSpout; -import org.apache.flink.storm.util.FiniteInMemorySpout; -import org.apache.flink.storm.util.StormConfig; -import org.apache.flink.storm.wrappers.SpoutWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import backtype.storm.utils.Utils; - -/** - * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming - * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. - * <p> - * The input is a plain text file with lines separated by newline characters. - * <p> - * Usage: <code>ExclamationWithSpout <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p> - * This example shows how to: - * <ul> - * <li>use a Storm spout within a Flink Streaming program</li> - * <li>make use of the FiniteSpout interface</li> - * <li>make use of the FiniteSpout interface</li> - * <li>how to configure a Spout using StormConfig</li> - * </ul> - */ -public class ExclamationWithSpout { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - final DataStream<String> text = getTextDataStream(env); - - final DataStream<String> exclaimed = text - .map(new ExclamationMap()) - .map(new ExclamationMap()); - - // emit result - if (fileOutput) { - exclaimed.writeAsText(outputPath); - } else { - exclaimed.print(); - } - - // execute program - env.execute("Streaming Exclamation with Storm spout source"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private static class ExclamationMap implements MapFunction<String, String> { - private static final long serialVersionUID = -684993133807698042L; - - @Override - public String map(String value) throws Exception { - return value + "!!!"; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: ExclamationWithSpout <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing ExclamationWithSpout example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: ExclamationWithSpout <text path> <result path>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - final String[] tokens = textPath.split(":"); - final String inputFile = tokens[tokens.length - 1]; - - // set Storm configuration - StormConfig config = new StormConfig(); - config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile); - env.getConfig().setGlobalJobParameters(config); - - return env.addSource( - new SpoutWrapper<String>(new FiniteFileSpout(), - new String[] { Utils.DEFAULT_STREAM_ID }), - TypeExtractor.getForClass(String.class)).setParallelism(1); - } - - return env.addSource( - new SpoutWrapper<String>(new FiniteInMemorySpout( - WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }), - TypeExtractor.getForClass(String.class)).setParallelism(1); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java deleted file mode 100644 index cfc49a1..0000000 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.excamation.operators; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.Map; - -public class ExclamationBolt implements IRichBolt { - private final static long serialVersionUID = -6364882114201311380L; - - public final static String EXCLAMATION_COUNT = "exclamation.count"; - - private OutputCollector collector; - private String exclamation; - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - - Object count = conf.get(EXCLAMATION_COUNT); - if (count != null) { - int exclamationNum = (Integer) count; - StringBuilder builder = new StringBuilder(); - for (int index = 0; index < exclamationNum; ++index) { - builder.append('!'); - } - this.exclamation = builder.toString(); - } else { - this.exclamation = "!"; - } - } - - @Override - public void cleanup() { - } - - @Override - public void execute(Tuple tuple) { - collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java new file mode 100644 index 0000000..3df4290 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.exclamation.operators.ExclamationBolt; + +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming + * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology} and submitted to + * Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}. + * <p> + * This example shows how to run program directly within Java, thus it cannot be used to submit a + * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink). + * <p> + * The input is a plain text file with lines separated by newline characters. + * <p> + * Usage: <code>ExclamationLocal <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}. + * <p> + * This example shows how to: + * <ul> + * <li>run a regular Storm program locally on Flink</li> + * </ul> + */ +public class ExclamationLocal { + + public final static String topologyId = "Streaming Exclamation"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!ExclamationTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final TopologyBuilder builder = ExclamationTopology.buildTopology(); + + // execute program locally + Config conf = new Config(); + conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation()); + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); + + Utils.sleep(10 * 1000); + cluster.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java new file mode 100644 index 0000000..43f526b --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import backtype.storm.topology.TopologyBuilder; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.exclamation.operators.ExclamationBolt; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.FiniteFileSpout; +import org.apache.flink.storm.util.FiniteInMemorySpout; +import org.apache.flink.storm.util.OutputFormatter; +import org.apache.flink.storm.util.SimpleOutputFormatter; + +/** + * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming + * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. + * <p> + * The input is a plain text file with lines separated by newline characters. + * <p> + * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path> + * <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p> + * This example shows how to: + * <ul> + * <li>construct a regular Storm topology as Flink program</li> + * <li>make use of the FiniteSpout interface</li> + * </ul> + */ +public class ExclamationTopology { + + public final static String spoutId = "source"; + public final static String firstBoltId = "exclamation1"; + public final static String secondBoltId = "exclamation2"; + public final static String sinkId = "sink"; + private final static OutputFormatter formatter = new SimpleOutputFormatter(); + + public static TopologyBuilder buildTopology() { + final TopologyBuilder builder = new TopologyBuilder(); + + // get input data + if (fileInputOutput) { + // read the text file from given input path + final String[] tokens = textPath.split(":"); + final String inputFile = tokens[tokens.length - 1]; + builder.setSpout(spoutId, new FiniteFileSpout(inputFile)); + } else { + builder.setSpout(spoutId, new FiniteInMemorySpout(WordCountData.WORDS)); + } + + builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId); + builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId); + + // emit result + if (fileInputOutput) { + // read the text file from given input path + final String[] tokens = outputPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)) + .shuffleGrouping(secondBoltId); + } else { + builder.setBolt(sinkId, new BoltPrintSink(formatter), 4) + .shuffleGrouping(secondBoltId); + } + + return builder; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileInputOutput = false; + private static String textPath; + private static String outputPath; + private static int exclamationNum = 3; + + static int getExclamation() { + return exclamationNum; + } + + static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + fileInputOutput = true; + if (args.length == 3) { + textPath = args[0]; + outputPath = args[1]; + exclamationNum = Integer.parseInt(args[2]); + } else { + System.err.println("Usage: StormExclamation* <text path> <result path> <number of exclamation marks>"); + return false; + } + } else { + System.out.println("Executing StormExclamation example with built-in default data"); + System.out.println(" Provide parameters to read input data from a file"); + System.out.println(" Usage: StormExclamation <text path> <result path> <number of exclamation marks>"); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java new file mode 100644 index 0000000..b47c0fa --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import backtype.storm.utils.Utils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.exclamation.operators.ExclamationBolt; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming + * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. + * <p> + * The input is a plain text file with lines separated by newline characters. + * <p> + * Usage: + * <code>ExclamationWithmBolt <text path> <result path> <number of exclamation marks></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2. + * <p> + * This example shows how to: + * <ul> + * <li>use a Bolt within a Flink Streaming program</li> + * <li>how to configure a Bolt using StormConfig</li> + * </ul> + */ +public class ExclamationWithBolt { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // set Storm configuration + StormConfig config = new StormConfig(); + config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum)); + env.getConfig().setGlobalJobParameters(config); + + // get input data + final DataStream<String> text = getTextDataStream(env); + + final DataStream<String> exclaimed = text + .transform("StormBoltTokenizer", + TypeExtractor.getForObject(""), + new BoltWrapper<String, String>(new ExclamationBolt(), + new String[] { Utils.DEFAULT_STREAM_ID })) + .map(new ExclamationMap()); + + // emit result + if (fileOutput) { + exclaimed.writeAsText(outputPath); + } else { + exclaimed.print(); + } + + // execute program + env.execute("Streaming WordCount with bolt tokenizer"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + private static class ExclamationMap implements MapFunction<String, String> { + private static final long serialVersionUID = 4614754344067170619L; + + @Override + public String map(String value) throws Exception { + return value + "!!!"; + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + private static int exclamationNum = 2; + + private static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 3) { + textPath = args[0]; + outputPath = args[1]; + exclamationNum = Integer.parseInt(args[2]); + } else { + System.err.println("Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>"); + return false; + } + } else { + System.out.println("Executing ExclamationWithBolt example with built-in default data"); + System.out.println(" Provide parameters to read input data from a file"); + System.out.println(" Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>"); + } + return true; + } + + private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } + + return env.fromElements(WordCountData.WORDS); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java new file mode 100644 index 0000000..380d9da --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import backtype.storm.utils.Utils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.util.FiniteFileSpout; +import org.apache.flink.storm.util.FiniteInMemorySpout; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming + * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}. + * <p> + * The input is a plain text file with lines separated by newline characters. + * <p> + * Usage: <code>ExclamationWithSpout <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p> + * This example shows how to: + * <ul> + * <li>use a Storm spout within a Flink Streaming program</li> + * <li>make use of the FiniteSpout interface</li> + * <li>make use of the FiniteSpout interface</li> + * <li>how to configure a Spout using StormConfig</li> + * </ul> + */ +public class ExclamationWithSpout { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + final DataStream<String> text = getTextDataStream(env); + + final DataStream<String> exclaimed = text + .map(new ExclamationMap()) + .map(new ExclamationMap()); + + // emit result + if (fileOutput) { + exclaimed.writeAsText(outputPath); + } else { + exclaimed.print(); + } + + // execute program + env.execute("Streaming Exclamation with Storm spout source"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + private static class ExclamationMap implements MapFunction<String, String> { + private static final long serialVersionUID = -684993133807698042L; + + @Override + public String map(String value) throws Exception { + return value + "!!!"; + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: ExclamationWithSpout <text path> <result path>"); + return false; + } + } else { + System.out.println("Executing ExclamationWithSpout example with built-in default data"); + System.out.println(" Provide parameters to read input data from a file"); + System.out.println(" Usage: ExclamationWithSpout <text path> <result path>"); + } + return true; + } + + private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { + if (fileOutput) { + final String[] tokens = textPath.split(":"); + final String inputFile = tokens[tokens.length - 1]; + + // set Storm configuration + StormConfig config = new StormConfig(); + config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile); + env.getConfig().setGlobalJobParameters(config); + + return env.addSource( + new SpoutWrapper<String>(new FiniteFileSpout(), + new String[] { Utils.DEFAULT_STREAM_ID }), + TypeExtractor.getForClass(String.class)).setParallelism(1); + } + + return env.addSource( + new SpoutWrapper<String>(new FiniteInMemorySpout( + WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }), + TypeExtractor.getForClass(String.class)).setParallelism(1); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java new file mode 100644 index 0000000..9bc00d2 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation.operators; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.Map; + +public class ExclamationBolt implements IRichBolt { + private final static long serialVersionUID = -6364882114201311380L; + + public final static String EXCLAMATION_COUNT = "exclamation.count"; + + private OutputCollector collector; + private String exclamation; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + + Object count = conf.get(EXCLAMATION_COUNT); + if (count != null) { + int exclamationNum = (Integer) count; + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < exclamationNum; ++index) { + builder.append('!'); + } + this.exclamation = builder.toString(); + } else { + this.exclamation = "!"; + } + } + + @Override + public void cleanup() { + } + + @Override + public void execute(Tuple tuple) { + collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java new file mode 100644 index 0000000..eaa91ee --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.join; + +import backtype.storm.Config; +import backtype.storm.testing.FeederSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.TupleOutputFormatter; +import storm.starter.bolt.PrinterBolt; +import storm.starter.bolt.SingleJoinBolt; + + +public class SingleJoinExample { + + public static void main(String[] args) throws Exception { + final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender", "hobbies")); + final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("gender", genderSpout); + + builder.setSpout("age", ageSpout); + + builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))) + .fieldsGrouping("gender", new Fields("id")) + .fieldsGrouping("age", new Fields("id")); + + // emit result + if (args.length > 0) { + // read the text file from given input path + builder.setBolt("fileOutput", new BoltFileSink(args[0], new TupleOutputFormatter())) + .shuffleGrouping("join"); + } else { + builder.setBolt("print", new PrinterBolt()).shuffleGrouping("join"); + } + + Config conf = new Config(); + conf.setDebug(true); + + String[] hobbies = new String[] {"reading", "biking", "travelling", "watching tv"}; + + for (int i = 0; i < 10; i++) { + String gender; + if (i % 2 == 0) { + gender = "male"; + } + else { + gender = "female"; + } + genderSpout.feed(new Values(i, gender, hobbies[i % hobbies.length])); + } + + for (int i = 9; i >= 0; i--) { + ageSpout.feed(new Values(i, i + 20)); + } + + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder)); + + Utils.sleep(10 * 1000); + + cluster.shutdown(); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java new file mode 100644 index 0000000..598a8d9 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { + String consumerKey = args[0]; + String consumerSecret = args[1]; + String accessToken = args[2]; + String accessTokenSecret = args[3]; + + // keywords start with the 5th parameter + String[] keyWords = Arrays.copyOfRange(args, 4, args.length); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret, + accessToken, accessTokenSecret, keyWords)); + builder.setBolt("print", new PrinterBolt()) + .shuffleGrouping("twitter"); + + + Config conf = new Config(); + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology("Print", conf, FlinkTopology.createTopology(builder)); + + Utils.sleep(10 * 1000); + + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java index 75450c4..4b41f8a 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java @@ -23,8 +23,6 @@ import backtype.storm.tuple.Values; import java.io.IOException; import java.util.Map; -import org.apache.flink.storm.util.FiniteSpout; - /** * Implements a Spout that reads data from a given local file. The spout stops automatically * when it reached the end of the file. http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java index 1490872..ff89a41 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java @@ -18,9 +18,6 @@ package org.apache.flink.storm.util; -import org.apache.flink.storm.util.FiniteSpout; - - /** * Implements a Spout that reads String[] data stored in memory. The Spout stops automatically when it emitted all of * the data. http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java index 6ee7fd9..cccf4c0 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java @@ -18,7 +18,6 @@ package org.apache.flink.storm.wordcount; import backtype.storm.topology.IRichBolt; - import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.examples.java.wordcount.util.WordCountData; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java index e093714..9b0d4ee 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java @@ -18,7 +18,6 @@ package org.apache.flink.storm.wordcount; import backtype.storm.topology.IRichBolt; - import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.PojoCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple2; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java index f5a1a35..50d4518 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java @@ -19,7 +19,6 @@ package org.apache.flink.storm.wordcount; import backtype.storm.topology.IRichBolt; import backtype.storm.tuple.Fields; - import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java index 0a7dfa0..6da3e3c 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java @@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; - import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.api.FlinkTopology; /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming @@ -35,7 +35,7 @@ import org.apache.flink.storm.api.FlinkTopologyBuilder; * <p> * The input is a plain text file with lines separated by newline characters. * <p> - * Usage: <code>WordCountLocal <text path> <result path></code><br> + * Usage: <code>WordCount <text path> <result path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. * <p> * This example shows how to: @@ -57,16 +57,13 @@ public class WordCountLocal { } // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + final TopologyBuilder builder = WordCountTopology.buildTopology(); - // execute program locally final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); + cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder)); Utils.sleep(10 * 1000); - // TODO kill does no do anything so far - cluster.killTopology(topologyId); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java index 67f4bbe..e6fd141 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java @@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; - import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.api.FlinkTopology; /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming @@ -58,17 +58,14 @@ public class WordCountLocalByName { } // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false); + final TopologyBuilder builder = WordCountTopology.buildTopology(false); - // execute program locally final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); + cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder)); Utils.sleep(10 * 1000); - // TODO kill does no do anything so far - cluster.killTopology(topologyId); cluster.shutdown(); - } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java index 8cb7cdd..c7c7f7c 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java @@ -22,12 +22,13 @@ import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.NotAliveException; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.api.FlinkClient; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.api.FlinkTopology; /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming @@ -63,7 +64,7 @@ public class WordCountRemoteByClient { } // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + final TopologyBuilder builder = WordCountTopology.buildTopology(); // execute program on Flink cluster final Config conf = new Config(); @@ -73,7 +74,7 @@ public class WordCountRemoteByClient { conf.put(Config.NIMBUS_THRIFT_PORT, 6123); final FlinkClient cluster = FlinkClient.getConfiguredClient(conf); - cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology()); + cluster.submitTopology(topologyId, uploadedJarLocation, FlinkTopology.createTopology(builder)); Utils.sleep(5 * 1000); http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java index a20843e..eb2713d 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java @@ -21,10 +21,11 @@ import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.storm.api.FlinkClient; import org.apache.flink.storm.api.FlinkSubmitter; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.api.FlinkTopology; /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming @@ -57,7 +58,7 @@ public class WordCountRemoteBySubmitter { } // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + final TopologyBuilder builder = WordCountTopology.buildTopology(); // execute program on Flink cluster final Config conf = new Config(); @@ -71,7 +72,7 @@ public class WordCountRemoteBySubmitter { // The user jar file must be specified via JVM argument if executed via Java. // => -Dstorm.jar=target/WordCount-StormTopology.jar // If bin/flink is used, the jar file is detected automatically. - FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + FlinkSubmitter.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); Thread.sleep(5 * 1000); http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java index 138df65..3e7c257 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java @@ -18,13 +18,12 @@ package org.apache.flink.storm.wordcount; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; - import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.api.FlinkTopologyBuilder; -import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.TupleOutputFormatter; import org.apache.flink.storm.wordcount.operators.BoltCounter; import org.apache.flink.storm.wordcount.operators.BoltCounterByName; @@ -55,13 +54,13 @@ public class WordCountTopology { public final static String sinkId = "sink"; private final static OutputFormatter formatter = new TupleOutputFormatter(); - public static FlinkTopologyBuilder buildTopology() { + public static TopologyBuilder buildTopology() { return buildTopology(true); } - public static FlinkTopologyBuilder buildTopology(boolean indexOrName) { + public static TopologyBuilder buildTopology(boolean indexOrName) { - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + final TopologyBuilder builder = new TopologyBuilder(); // get input data if (fileInputOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java index eb96160..c06c268 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java @@ -17,16 +17,15 @@ package org.apache.flink.storm.wordcount.operators; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.storm.util.InMemorySpout; - import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.util.FiniteInMemorySpout; /** * Implements a Spout that reads data from {@link WordCountData#WORDS}. */ -public final class WordCountInMemorySpout extends InMemorySpout<String> { +public final class WordCountInMemorySpout extends FiniteInMemorySpout { private static final long serialVersionUID = 8832143302409465843L; public WordCountInMemorySpout() { http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java index 781396c..5a37572 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java @@ -18,12 +18,11 @@ package org.apache.flink.storm.exclamation; -import org.apache.flink.storm.excamation.ExclamationWithBolt; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class ExclamationWithBoltITCase extends StormTestBase { +public class ExclamationWithBoltITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java index 36b8aed..c2b0467 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java @@ -18,12 +18,11 @@ package org.apache.flink.storm.exclamation; -import org.apache.flink.storm.excamation.ExclamationWithSpout; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class ExclamationWithSpoutITCase extends StormTestBase { +public class ExclamationWithSpoutITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java index cec276f..049c881 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java @@ -18,12 +18,11 @@ package org.apache.flink.storm.exclamation; -import org.apache.flink.storm.excamation.ExclamationLocal; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class StormExclamationLocalITCase extends StormTestBase { +public class StormExclamationLocalITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java new file mode 100644 index 0000000..b51db2c --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.join; + +import com.google.common.base.Joiner; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class SingleJoinITCase extends StreamingProgramTestBase { + + protected static String expectedOutput[] = { + "(male,20)", + "(female,21)", + "(male,22)", + "(female,23)", + "(male,24)", + "(female,25)", + "(male,26)", + "(female,27)", + "(male,28)", + "(female,29)" + }; + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + // We need to remove the file scheme because we can't use the Flink file system. + // (to remain compatible with Storm) + SingleJoinExample.main(new String[]{ this.resultPath.replace("file:", "") }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java index d0973cb..38147b2 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java @@ -17,12 +17,12 @@ */ package org.apache.flink.storm.split; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import backtype.storm.topology.TopologyBuilder; import org.apache.flink.storm.split.operators.RandomSpout; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; -import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.TupleOutputFormatter; public class SplitBoltTopology { @@ -33,8 +33,8 @@ public class SplitBoltTopology { public final static String sinkId = "sink"; private final static OutputFormatter formatter = new TupleOutputFormatter(); - public static FlinkTopologyBuilder buildTopology() { - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + public static TopologyBuilder buildTopology() { + final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new RandomSpout(false, seed)); builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId);