http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java new file mode 100644 index 0000000..71a5e8d --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import backtype.storm.LocalCluster; +import backtype.storm.generated.StormTopology; +import backtype.storm.utils.Utils; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm {@link LocalCluster}. In contrast to {@link WordCountLocal} all bolts access the field of + * input tuples by name instead of index. + * <p/> + * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} + * via Flink command line clients (ie, bin/flink). + * <p/> + * <p/> + * The input is a plain text file with lines separated by newline characters. + * <p/> + * <p/> + * Usage: <code>WordCountLocalByName <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p/> + * <p/> + * This example shows how to: + * <ul> + * <li>run a regular Storm program locally on Flink + * </ul> + */ +public class WordCountLocalByName { + public final static String topologyId = "Storm WordCountName"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false); + + // execute program locally + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, null, builder.createTopology()); + + Utils.sleep(10 * 1000); + + // TODO kill does no do anything so far + cluster.killTopology(topologyId); + cluster.shutdown(); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java new file mode 100644 index 0000000..2e4fb03 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import backtype.storm.Config; +import backtype.storm.generated.AlreadyAliveException; +import backtype.storm.generated.InvalidTopologyException; +import backtype.storm.generated.NotAliveException; +import backtype.storm.generated.StormTopology; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkClient; +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote. + * <p/> + * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via + * Flink command line clients (ie, bin/flink). + * <p/> + * <p/> + * The input is a plain text file with lines separated by newline characters. + * <p/> + * <p/> + * Usage: <code>WordCountRemoteByClient <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p/> + * <p/> + * This example shows how to: + * <ul> + * <li>submit a regular Storm program to a local or remote Flink cluster.</li> + * </ul> + */ +public class WordCountRemoteByClient { + public final static String topologyId = "Storm WordCount"; + private final static String uploadedJarLocation = "target/WordCount-StormTopology.jar"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException, + NotAliveException { + + if (!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + + // execute program on Flink cluster + final Config conf = new Config(); + // can be changed to remote address + conf.put(Config.NIMBUS_HOST, "localhost"); + // use default flink jobmanger.rpc.port + conf.put(Config.NIMBUS_THRIFT_PORT, 6123); + + final FlinkClient cluster = FlinkClient.getConfiguredClient(conf); + cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology()); + + Utils.sleep(5 * 1000); + + cluster.killTopology(topologyId); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java new file mode 100644 index 0000000..173074c --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkClient; +import org.apache.flink.storm.api.FlinkSubmitter; +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the + * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote. + * <p/> + * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). + * <p/> + * <p/> + * The input is a plain text file with lines separated by newline characters. + * <p/> + * <p/> + * Usage: <code>WordCountRemoteBySubmitter <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p/> + * <p/> + * This example shows how to: + * <ul> + * <li>submit a regular Storm program to a local or remote Flink cluster.</li> + * </ul> + */ +public class WordCountRemoteBySubmitter { + public final static String topologyId = "Storm WordCount"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!WordCountTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + + // execute program on Flink cluster + final Config conf = new Config(); + // We can set Jobmanager host/port values manually or leave them blank + // if not set and + // - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter + // - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter. + // conf.put(Config.NIMBUS_HOST, "localhost"); + // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); + + // The user jar file must be specified via JVM argument if executed via Java. + // => -Dstorm.jar=target/WordCount-StormTopology.jar + // If bin/flink is used, the jar file is detected automatically. + FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + + Thread.sleep(5 * 1000); + + FlinkClient.getConfiguredClient(conf).killTopology(topologyId); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java new file mode 100644 index 0000000..8ee374d --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import backtype.storm.generated.StormTopology; +import backtype.storm.tuple.Fields; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.util.OutputFormatter; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.TupleOutputFormatter; +import org.apache.flink.storm.wordcount.operators.BoltCounter; +import org.apache.flink.storm.wordcount.operators.BoltCounterByName; +import org.apache.flink.storm.wordcount.operators.BoltTokenizer; +import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName; +import org.apache.flink.storm.wordcount.operators.WordCountFileSpout; +import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout; + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming + * fashion. The program is constructed as a regular {@link StormTopology}. + * <p/> + * <p/> + * The input is a plain text file with lines separated by newline characters. + * <p/> + * <p/> + * Usage: + * <code>WordCount[Local|LocalByName|RemoteByClient|RemoteBySubmitter] <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * <p/> + * <p/> + * This example shows how to: + * <ul> + * <li>how to construct a regular Storm topology as Flink program</li> + * </ul> + */ +public class WordCountTopology { + public final static String spoutId = "source"; + public final static String tokenierzerId = "tokenizer"; + public final static String counterId = "counter"; + public final static String sinkId = "sink"; + private final static OutputFormatter formatter = new TupleOutputFormatter(); + + public static FlinkTopologyBuilder buildTopology() { + return buildTopology(true); + } + + public static FlinkTopologyBuilder buildTopology(boolean indexOrName) { + + final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + + // get input data + if (fileInputOutput) { + // read the text file from given input path + final String[] tokens = textPath.split(":"); + final String inputFile = tokens[tokens.length - 1]; + builder.setSpout(spoutId, new WordCountFileSpout(inputFile)); + } else { + builder.setSpout(spoutId, new WordCountInMemorySpout()); + } + + if (indexOrName) { + // split up the lines in pairs (2-tuples) containing: (word,1) + builder.setBolt(tokenierzerId, new BoltTokenizer(), 4).shuffleGrouping(spoutId); + // group by the tuple field "0" and sum up tuple field "1" + builder.setBolt(counterId, new BoltCounter(), 4).fieldsGrouping(tokenierzerId, + new Fields(BoltTokenizer.ATTRIBUTE_WORD)); + } else { + // split up the lines in pairs (2-tuples) containing: (word,1) + builder.setBolt(tokenierzerId, new BoltTokenizerByName(), 4).shuffleGrouping( + spoutId); + // group by the tuple field "0" and sum up tuple field "1" + builder.setBolt(counterId, new BoltCounterByName(), 4).fieldsGrouping( + tokenierzerId, new Fields(BoltTokenizerByName.ATTRIBUTE_WORD)); + } + + // emit result + if (fileInputOutput) { + // read the text file from given input path + final String[] tokens = outputPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)).shuffleGrouping(counterId); + } else { + builder.setBolt(sinkId, new BoltPrintSink(formatter), 4).shuffleGrouping(counterId); + } + + return builder; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileInputOutput = false; + private static String textPath; + private static String outputPath; + + static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + fileInputOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount* <text path> <result path>"); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data"); + System.out.println(" Provide parameters to read input data from a file"); + System.out.println(" Usage: WordCount* <text path> <result path>"); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java new file mode 100644 index 0000000..d21a584 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple + * schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: + * {@code <String,Integer>} ). + * <p> + * Same as {@link BoltCounterByName}, but accesses input attribute by index (instead of name). + */ +public class BoltCounter implements IRichBolt { + private static final long serialVersionUID = 399619605462625934L; + + public static final String ATTRIBUTE_WORD = "word"; + public static final String ATTRIBUTE_COUNT = "count"; + + private final HashMap<String, Count> counts = new HashMap<String, Count>(); + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(final Tuple input) { + final String word = input.getString(BoltTokenizer.ATTRIBUTE_WORD_INDEX); + + Count currentCount = this.counts.get(word); + if (currentCount == null) { + currentCount = new Count(); + this.counts.put(word, currentCount); + } + currentCount.count += input.getInteger(BoltTokenizer.ATTRIBUTE_COUNT_INDEX); + + this.collector.emit(new Values(word, currentCount.count)); + } + + @Override + public void cleanup() {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + /** + * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object + * creating/deletion. + */ + private static final class Count { + public int count; + + public Count() {/* nothing to do */} + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java new file mode 100644 index 0000000..d5c05d7 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple + * schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: + * {@code <String,Integer>} ). + * <p> + * Same as {@link BoltCounter}, but accesses input attribute by name (instead of index). + */ +public class BoltCounterByName implements IRichBolt { + private static final long serialVersionUID = 399619605462625934L; + + public static final String ATTRIBUTE_WORD = "word"; + public static final String ATTRIBUTE_COUNT = "count"; + + private final HashMap<String, Count> counts = new HashMap<String, Count>(); + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(final Tuple input) { + final String word = input.getStringByField(BoltTokenizer.ATTRIBUTE_WORD); + + Count currentCount = this.counts.get(word); + if (currentCount == null) { + currentCount = new Count(); + this.counts.put(word, currentCount); + } + currentCount.count += input.getIntegerByField(BoltTokenizer.ATTRIBUTE_COUNT); + + this.collector.emit(new Values(word, currentCount.count)); + } + + @Override + public void cleanup() {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + /** + * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object + * creating/deletion. + */ + private static final class Count { + public int count; + + public Count() {/* nothing to do */} + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java new file mode 100644 index 0000000..74d6a99 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.Map; + +/** + * Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple + * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: + * {@code <String,Integer>}). + * <p> + * Same as {@link BoltTokenizerByName}, but accesses input attribute by index (instead of name). + */ +public final class BoltTokenizer implements IRichBolt { + private static final long serialVersionUID = -8589620297208175149L; + + public static final String ATTRIBUTE_WORD = "word"; + public static final String ATTRIBUTE_COUNT = "count"; + + public static final int ATTRIBUTE_WORD_INDEX = 0; + public static final int ATTRIBUTE_COUNT_INDEX = 1; + + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(final Tuple input) { + final String[] tokens = input.getString(0).toLowerCase().split("\\W+"); + + for (final String token : tokens) { + if (token.length() > 0) { + this.collector.emit(new Values(token, 1)); + } + } + } + + @Override + public void cleanup() {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java new file mode 100644 index 0000000..3c56b36 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.Map; + +/** + * Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple + * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: + * {@code <String,Integer>}). + * <p> + * Same as {@link BoltTokenizer}, but accesses input attribute by name (instead of index). + */ +public final class BoltTokenizerByName implements IRichBolt { + private static final long serialVersionUID = -8589620297208175149L; + + public static final String ATTRIBUTE_WORD = "word"; + public static final String ATTRIBUTE_COUNT = "count"; + + public static final int ATTRIBUTE_WORD_INDEX = 0; + public static final int ATTRIBUTE_COUNT_INDEX = 1; + + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(final Tuple input) { + final String[] tokens = input.getStringByField("sentence").toLowerCase().split("\\W+"); + + for (final String token : tokens) { + if (token.length() > 0) { + this.collector.emit(new Values(token, 1)); + } + } + } + + @Override + public void cleanup() {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java new file mode 100644 index 0000000..3a8fd3a --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import java.io.Serializable; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; + +public class WordCountDataPojos { + public static Sentence[] SENTENCES; + + static { + SENTENCES = new Sentence[WordCountData.WORDS.length]; + for (int i = 0; i < SENTENCES.length; ++i) { + SENTENCES[i] = new Sentence(WordCountData.WORDS[i]); + } + } + + public static class Sentence implements Serializable { + private static final long serialVersionUID = -7336372859203407522L; + + private String sentence; + + public Sentence() { + } + + public Sentence(String sentence) { + this.sentence = sentence; + } + + public String getSentence() { + return sentence; + } + + public void setSentence(String sentence) { + this.sentence = sentence; + } + + @Override + public String toString() { + return "(" + this.sentence + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java new file mode 100644 index 0000000..16e2ba0 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.examples.java.wordcount.util.WordCountData; + +@SuppressWarnings("unchecked") +public class WordCountDataTuple { + public static Tuple1<String>[] TUPLES; + + static { + TUPLES = new Tuple1[WordCountData.WORDS.length]; + for (int i = 0; i < TUPLES.length; ++i) { + TUPLES[i] = new Tuple1<String>(WordCountData.WORDS[i]); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java new file mode 100644 index 0000000..76a198f --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import org.apache.flink.storm.util.FileSpout; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; + +/** + * Implements a Spout that reads data from a given local file. + */ +public final class WordCountFileSpout extends FileSpout { + private static final long serialVersionUID = 2372251989250954503L; + + public WordCountFileSpout(String path) { + super(path); + } + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java new file mode 100644 index 0000000..eb96160 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount.operators; + +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.storm.util.InMemorySpout; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; + +/** + * Implements a Spout that reads data from {@link WordCountData#WORDS}. + */ +public final class WordCountInMemorySpout extends InMemorySpout<String> { + private static final long serialVersionUID = 8832143302409465843L; + + public WordCountInMemorySpout() { + super(WordCountData.WORDS); + } + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java new file mode 100644 index 0000000..781396c --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import org.apache.flink.storm.excamation.ExclamationWithBolt; +import org.apache.flink.storm.exclamation.util.ExclamationData; +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class ExclamationWithBoltITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + protected String exclamationNum; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + this.exclamationNum = "3"; + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + ExclamationWithBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java new file mode 100644 index 0000000..36b8aed --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import org.apache.flink.storm.excamation.ExclamationWithSpout; +import org.apache.flink.storm.exclamation.util.ExclamationData; +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class ExclamationWithSpoutITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + ExclamationWithSpout.main(new String[]{this.textPath, this.resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java new file mode 100644 index 0000000..cec276f --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation; + +import org.apache.flink.storm.excamation.ExclamationLocal; +import org.apache.flink.storm.exclamation.util.ExclamationData; +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class StormExclamationLocalITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + protected String exclamationNum; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + this.exclamationNum = "3"; + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + ExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java new file mode 100644 index 0000000..3c435f9 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.exclamation.util; + +public class ExclamationData { + + public static final String TEXT_WITH_EXCLAMATIONS = + "Goethe - Faust: Der Tragoedie erster Teil!!!!!!\n" + + "Prolog im Himmel.!!!!!!\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei!!!!!!\n" + + "Erzengel treten vor.!!!!!!\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,!!!!!!\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick!!!!!!\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich!!!!!!\n" + + "hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde!!!!!!\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es!!!!!!\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und!!!!!!\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.!!!!!!\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land!!!!!!\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.!!!!!!\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch!!!!!!\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.!!!!!!\n" + + "ZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden!!!!!!\n" + + "mag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n" + + "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie!!!!!!\n" + + "alles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So!!!!!!\n" + + "siehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte!!!!!!\n" + + "machen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte!!!!!!\n" + + "dich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von!!!!!!\n" + + "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die!!!!!!\n" + + "Menschen plagen. Der kleine Gott der Welt bleibt stets von gleichem!!!!!!\n" + + "Schlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser!!!!!!\n" + + "wuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;!!!!!!\n" + + "Er nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier!!!!!!\n" + + "zu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der!!!!!!\n" + + "langbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im!!!!!!\n" + + "Gras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In!!!!!!\n" + + "jeden Quark begraebt er seine Nase.!!!!!!\n" + + "DER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer!!!!!!\n" + + "anzuklagen? Ist auf der Erde ewig dir nichts recht?!!!!!!\n" + + "MEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich!!!!!!\n" + + "schlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar!!!!!!\n" + + "die armen selbst nicht plagen.!!!!!!\n" + "DER HERR: Kennst du den Faust?!!!!!!\n" + + "MEPHISTOPHELES: Den Doktor?!!!!!!\n" + + "DER HERR: Meinen Knecht!!!!!!!\n" + + "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch!!!!!!\n" + + "ist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er!!!!!!\n" + + "ist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten!!!!!!\n" + + "Sterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne!!!!!!\n" + + "Befriedigt nicht die tiefbewegte Brust.!!!!!!\n" + + "DER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in!!!!!!\n" + + "die Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das!!!!!!\n" + + "Bluet und Frucht die kuenft'gen Jahre zieren.!!!!!!\n" + + "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr!!!!!!\n" + + "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.!!!!!!\n" + + "DER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,!!!!!!\n" + + "Es irrt der Mensch so lang er strebt.!!!!!!\n" + + "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals!!!!!!\n" + + "gern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer!!!!!!\n" + + "einem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.!!!!!!\n" + + "DER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem!!!!!!\n" + + "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit!!!!!!\n" + + "herab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in!!!!!!\n" + + "seinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.!!!!!!\n" + + "MEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine!!!!!!\n" + + "Wette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir!!!!!!\n" + + "Triumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine!!!!!!\n" + + "Muhme, die beruehmte Schlange.!!!!!!\n" + + "DER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen!!!!!!\n" + + "nie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am!!!!!!\n" + + "wenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,!!!!!!\n" + + "er liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen!!!!!!\n" + + "zu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten!!!!!!\n" + + "Goettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das!!!!!!\n" + + "ewig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was!!!!!!\n" + + "in schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!!!!!!!\n" + + "(Der Himmel schliesst, die Erzengel verteilen sich.)!!!!!!\n" + + "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und!!!!!!\n" + + "huete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,!!!!!!\n" + + "So menschlich mit dem Teufel selbst zu sprechen.!!!!!!"; +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java new file mode 100644 index 0000000..dc174e7 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.junit.Test; + +public class BoltSplitITCase { + + @Test + public void testTopology() throws Exception { + SplitStreamBoltLocal.main(new String[] { "0", "/dev/null" }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java new file mode 100644 index 0000000..c7b9c1d --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class SplitBolt extends BaseRichBolt { + private static final long serialVersionUID = -6627606934204267173L; + + public static final String EVEN_STREAM = "even"; + public static final String ODD_STREAM = "odd"; + + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + + } + + @Override + public void execute(Tuple input) { + if (input.getInteger(0) % 2 == 0) { + this.collector.emit(EVEN_STREAM, new Values(input.getInteger(0))); + } else { + this.collector.emit(ODD_STREAM, new Values(input.getInteger(0))); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + Fields schema = new Fields("number"); + declarer.declareStream(EVEN_STREAM, schema); + declarer.declareStream(ODD_STREAM, schema); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java new file mode 100644 index 0000000..d0973cb --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.split.operators.RandomSpout; +import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; +import org.apache.flink.storm.util.OutputFormatter; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.TupleOutputFormatter; + +public class SplitBoltTopology { + public final static String spoutId = "randomSource"; + public final static String boltId = "splitBolt"; + public final static String evenVerifierId = "evenVerifier"; + public final static String oddVerifierId = "oddVerifier"; + public final static String sinkId = "sink"; + private final static OutputFormatter formatter = new TupleOutputFormatter(); + + public static FlinkTopologyBuilder buildTopology() { + final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + + builder.setSpout(spoutId, new RandomSpout(false, seed)); + builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId); + builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(boltId, + SplitBolt.EVEN_STREAM); + builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(boltId, + SplitBolt.ODD_STREAM); + + // emit result + if (outputPath != null) { + // read the text file from given input path + final String[] tokens = outputPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)) + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + } else { + builder.setBolt(sinkId, new BoltPrintSink(formatter), 4) + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + } + + return builder; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static long seed = System.currentTimeMillis(); + private static String outputPath = null; + + static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + if (args.length == 2) { + seed = Long.parseLong(args[0]); + outputPath = args[1]; + } else { + System.err.println("Usage: SplitStreamBoltLocal <seed> <result path>"); + return false; + } + } else { + System.out.println("Executing SplitBoltTopology example with random data"); + System.out.println(" Usage: SplitStreamBoltLocal <seed> <result path>"); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java new file mode 100644 index 0000000..2b3b6a8 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.split.operators.RandomSpout; +import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; +import org.apache.flink.storm.util.OutputFormatter; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.TupleOutputFormatter; + +public class SplitSpoutTopology { + public final static String spoutId = "randomSplitSource"; + public final static String evenVerifierId = "evenVerifier"; + public final static String oddVerifierId = "oddVerifier"; + public final static String sinkId = "sink"; + private final static OutputFormatter formatter = new TupleOutputFormatter(); + + public static FlinkTopologyBuilder buildTopology() { + final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + + builder.setSpout(spoutId, new RandomSpout(true, seed)); + builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId, + RandomSpout.EVEN_STREAM); + builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(spoutId, + RandomSpout.ODD_STREAM); + + // emit result + if (outputPath != null) { + // read the text file from given input path + final String[] tokens = outputPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)) + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + } else { + builder.setBolt(sinkId, new BoltPrintSink(formatter), 4) + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + } + + return builder; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static long seed = System.currentTimeMillis(); + private static String outputPath = null; + + static boolean parseParameters(final String[] args) { + + if (args.length > 0) { + // parse input arguments + if (args.length == 2) { + seed = Long.parseLong(args[0]); + outputPath = args[1]; + } else { + System.err.println("Usage: SplitStreamSpoutLocal <seed> <result path>"); + return false; + } + } else { + System.out.println("Executing SplitSpoutTopology example with random data"); + System.out.println(" Usage: SplitStreamSpoutLocal <seed> <result path>"); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java new file mode 100644 index 0000000..e2c22f9 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +import backtype.storm.utils.Utils; + +public class SplitStreamBoltLocal { + public final static String topologyId = "Bolt split stream example"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!SplitBoltTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = SplitBoltTopology.buildTopology(); + + // execute program locally + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, null, builder.createTopology()); + + Utils.sleep(5 * 1000); + + // TODO kill does no do anything so far + cluster.killTopology(topologyId); + cluster.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java new file mode 100644 index 0000000..2070f66 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +import backtype.storm.utils.Utils; + +public class SplitStreamSpoutLocal { + public final static String topologyId = "Spout split stream example"; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(final String[] args) throws Exception { + + if (!SplitSpoutTopology.parseParameters(args)) { + return; + } + + // build Topology the Storm way + final FlinkTopologyBuilder builder = SplitSpoutTopology.buildTopology(); + + // execute program locally + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, null, builder.createTopology()); + + Utils.sleep(5 * 1000); + + // TODO kill does no do anything so far + cluster.killTopology(topologyId); + cluster.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java new file mode 100644 index 0000000..8e0fda9 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.split; + +import org.junit.Test; + +public class SpoutSplitITCase { + + @Test + public void testTopology() throws Exception { + SplitStreamSpoutLocal.main(new String[] { "0", "/dev/null" }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java new file mode 100644 index 0000000..32dac7b --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.junit.Test; + +import static org.junit.Assert.fail; + +/** + * Base class for Storm tests. + */ +public abstract class StormTestBase extends AbstractTestBase { + + public static final int DEFAULT_PARALLELISM = 4; + + public StormTestBase() { + this(new Configuration()); + } + + public StormTestBase(Configuration config) { + super(config, StreamingMode.STREAMING); + setTaskManagerNumSlots(DEFAULT_PARALLELISM); + } + + // ------------------------------------------------------------------------ + // Methods to create the test program and for pre- and post- test work + // ------------------------------------------------------------------------ + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + // ------------------------------------------------------------------------ + // Test entry point + // ------------------------------------------------------------------------ + + @Test + public void testJob() throws Exception { + try { + // pre-submit + try { + preSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // prepare the test environment + startCluster(); + + // we need to initialize the stream test environment, and the storm local cluster + TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM); + + FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() { + @Override + public FlinkLocalCluster createLocalCluster() { + return new FlinkLocalCluster(executor); + } + }); + + // call the test program + try { + testProgram(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } + + // post-submit + try { + postSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Post-submit work caused an error: " + e.getMessage()); + } + } + finally { + // reset the FlinkLocalCluster to its default behavior + FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory()); + + // reset the StreamExecutionEnvironment to its default behavior + TestStreamEnvironment.unsetAsContext(); + + // clean up all resources + stopCluster(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java new file mode 100644 index 0000000..62d23ab --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.storm.wordcount.BoltTokenizerWordCount; +import org.apache.flink.test.testdata.WordCountData; + +public class BoltTokenizerWordCountITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java new file mode 100644 index 0000000..009bdc2 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojo; +import org.apache.flink.test.testdata.WordCountData; + +public class BoltTokenizerWordCountPojoITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java new file mode 100644 index 0000000..321015b --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNames; +import org.apache.flink.test.testdata.WordCountData; + +public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java new file mode 100644 index 0000000..0cff211 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wordcount; + +import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.storm.wordcount.SpoutSourceWordCount; +import org.apache.flink.test.testdata.WordCountData; + +public class SpoutSourceWordCountITCase extends StormTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + SpoutSourceWordCount.main(new String[]{this.textPath, this.resultPath}); + } + +}