http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java index 2b3b6a8..46bf929 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java @@ -17,12 +17,12 @@ */ package org.apache.flink.storm.split; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import backtype.storm.topology.TopologyBuilder; import org.apache.flink.storm.split.operators.RandomSpout; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; -import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.storm.util.BoltPrintSink; +import org.apache.flink.storm.util.OutputFormatter; import org.apache.flink.storm.util.TupleOutputFormatter; public class SplitSpoutTopology { @@ -32,8 +32,8 @@ public class SplitSpoutTopology { public final static String sinkId = "sink"; private final static OutputFormatter formatter = new TupleOutputFormatter(); - public static FlinkTopologyBuilder buildTopology() { - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + public static TopologyBuilder buildTopology() { + final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new RandomSpout(true, seed)); builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java index e2c22f9..19d5873 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java @@ -16,10 +16,11 @@ */ package org.apache.flink.storm.split; -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; - +import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SplitStreamBoltLocal { public final static String topologyId = "Bolt split stream example"; @@ -35,16 +36,13 @@ public class SplitStreamBoltLocal { } // build Topology the Storm way - final FlinkTopologyBuilder builder = SplitBoltTopology.buildTopology(); + final TopologyBuilder builder = SplitBoltTopology.buildTopology(); - // execute program locally final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); + cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder)); - Utils.sleep(5 * 1000); + Utils.sleep(10 * 1000); - // TODO kill does no do anything so far - cluster.killTopology(topologyId); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java index 2070f66..4ab9d8a 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java @@ -16,10 +16,11 @@ */ package org.apache.flink.storm.split; -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; - +import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SplitStreamSpoutLocal { public final static String topologyId = "Spout split stream example"; @@ -35,16 +36,13 @@ public class SplitStreamSpoutLocal { } // build Topology the Storm way - final FlinkTopologyBuilder builder = SplitSpoutTopology.buildTopology(); + final TopologyBuilder builder = SplitSpoutTopology.buildTopology(); - // execute program locally final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); + cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder)); - Utils.sleep(5 * 1000); + Utils.sleep(10 * 1000); - // TODO kill does no do anything so far - cluster.killTopology(topologyId); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index 2421324..77a35d0 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -17,17 +17,18 @@ */ package org.apache.flink.storm.tests; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.TaskIdBolt; import org.apache.flink.storm.util.BoltFileSink; -import org.apache.flink.storm.util.StormTestBase; +import org.apache.flink.streaming.util.StreamingProgramTestBase; -public class StormFieldsGroupingITCase extends StormTestBase { +public class StormFieldsGroupingITCase extends StreamingProgramTestBase { private final static String topologyId = "FieldsGrouping Test"; private final static String spoutId = "spout"; @@ -52,7 +53,7 @@ public class StormFieldsGroupingITCase extends StormTestBase { final String[] tokens = this.resultPath.split(":"); final String outputFile = tokens[tokens.length - 1]; - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( @@ -60,7 +61,7 @@ public class StormFieldsGroupingITCase extends StormTestBase { builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); + cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder)); Utils.sleep(10 * 1000); http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java index 6c5bea2..b69dde7 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java @@ -27,6 +27,9 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +/** + * Bolt to prepend all incoming tuple values with the task id. + */ public class TaskIdBolt extends BaseRichBolt { private static final long serialVersionUID = -7966475984592762720L; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java deleted file mode 100644 index 32dac7b..0000000 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.storm.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; -import org.apache.flink.storm.api.FlinkLocalCluster; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.AbstractTestBase; -import org.junit.Test; - -import static org.junit.Assert.fail; - -/** - * Base class for Storm tests. - */ -public abstract class StormTestBase extends AbstractTestBase { - - public static final int DEFAULT_PARALLELISM = 4; - - public StormTestBase() { - this(new Configuration()); - } - - public StormTestBase(Configuration config) { - super(config, StreamingMode.STREAMING); - setTaskManagerNumSlots(DEFAULT_PARALLELISM); - } - - // ------------------------------------------------------------------------ - // Methods to create the test program and for pre- and post- test work - // ------------------------------------------------------------------------ - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // ------------------------------------------------------------------------ - // Test entry point - // ------------------------------------------------------------------------ - - @Test - public void testJob() throws Exception { - try { - // pre-submit - try { - preSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Pre-submit work caused an error: " + e.getMessage()); - } - - // prepare the test environment - startCluster(); - - // we need to initialize the stream test environment, and the storm local cluster - TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM); - - FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() { - @Override - public FlinkLocalCluster createLocalCluster() { - return new FlinkLocalCluster(executor); - } - }); - - // call the test program - try { - testProgram(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Error while calling the test program: " + e.getMessage()); - } - - // post-submit - try { - postSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Post-submit work caused an error: " + e.getMessage()); - } - } - finally { - // reset the FlinkLocalCluster to its default behavior - FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory()); - - // reset the StreamExecutionEnvironment to its default behavior - TestStreamEnvironment.unsetAsContext(); - - // clean up all resources - stopCluster(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java index 62d23ab..f48e2f6 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.BoltTokenizerWordCount; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class BoltTokenizerWordCountITCase extends StormTestBase { +public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java index 009bdc2..902cacf 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojo; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class BoltTokenizerWordCountPojoITCase extends StormTestBase { +public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java index 321015b..160efb3 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNames; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase { +public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java index 0cff211..17f5be5 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.SpoutSourceWordCount; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class SpoutSourceWordCountITCase extends StormTestBase { +public class SpoutSourceWordCountITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java index 39e7a25..7081207 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.WordCountLocal; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class WordCountLocalITCase extends StormTestBase { +public class WordCountLocalITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java index 78acfe5..b04faa5 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java @@ -18,11 +18,11 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.storm.util.StormTestBase; -import org.apache.flink.storm.wordcount.WordCountLocalByName; +import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class WordCountLocalNamedITCase extends StormTestBase { + +public class WordCountLocalNamedITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties index 0b686e5..881dc06 100644 --- a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties +++ b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties @@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index c311f6c..fa7ae79 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -183,10 +183,10 @@ public class FlinkClient { /* set storm configuration */ if (this.conf != null) { - topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf)); + topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(this.conf)); } - final StreamGraph streamGraph = topology.getStreamGraph(); + final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph(); streamGraph.setJobName(name); final JobGraph jobGraph = streamGraph.getJobGraph(); http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java index 944c6cd..00e1d03 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java @@ -26,6 +26,7 @@ import backtype.storm.generated.StormTopology; import backtype.storm.generated.SubmitOptions; import backtype.storm.generated.TopologyInfo; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -48,12 +49,10 @@ public class FlinkLocalCluster { private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class); /** The flink mini cluster on which to execute the programs */ - private final FlinkMiniCluster flink; + private FlinkMiniCluster flink; public FlinkLocalCluster() { - this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING); - this.flink.start(); } public FlinkLocalCluster(FlinkMiniCluster flink) { @@ -71,13 +70,26 @@ public class FlinkLocalCluster { LOG.info("Running Storm topology on FlinkLocalCluster"); if(conf != null) { - topology.getConfig().setGlobalJobParameters(new StormConfig(conf)); + topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(conf)); } - StreamGraph streamGraph = topology.getStreamGraph(); + StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph(); streamGraph.setJobName(topologyName); JobGraph jobGraph = streamGraph.getJobGraph(); + + if (flink == null) { + + Configuration configuration = new Configuration(); + configuration.addAll(jobGraph.getJobConfiguration()); + + configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); + + flink = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING); + this.flink.start(); + } + this.flink.submitJobDetached(jobGraph); } @@ -99,6 +111,7 @@ public class FlinkLocalCluster { public void shutdown() { flink.stop(); + flink = null; } public String getTopologyConf(final String id) { http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java index febd56d..7c8d1ec 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java @@ -20,7 +20,6 @@ package org.apache.flink.storm.api; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java index 531d6df..7d55463 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java @@ -1,4 +1,5 @@ /* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,75 +16,473 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong> */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>(); + + private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = + new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>(); + + final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map<String, IRichSpout> spouts; + private final Map<String, IRichBolt> bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** - * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link - * FlinkClient}. * - * @throws UnsupportedOperationException - * at every invocation + * Creates a Flink program that uses the specified spouts and bolts. + * @param stormBuilder The Storm topology builder to use for creating the Flink topology. + * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed. */ - @Override - public JobExecutionResult execute() throws Exception { - throw new UnsupportedOperationException( - "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); + public static FlinkTopology createTopology(TopologyBuilder stormBuilder) { + return new FlinkTopology(stormBuilder); } /** - * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link - * FlinkClient}. - * - * @throws UnsupportedOperationException - * at every invocation + * Returns the underlying Flink {@link StreamExecutionEnvironment} for the Storm topology. + * @return The contextual environment (local or remote). */ - @Override - public JobExecutionResult execute(final String jobName) throws Exception { - throw new UnsupportedOperationException( - "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); + public StreamExecutionEnvironment getExecutionEnvironment() { + return this.env; } /** - * Increased the number of declared tasks of this program by the given value. - * - * @param dop - * The dop of a new operator that increases the number of overall tasks. + * Directly executes the Storm topology based on the current context (local when in IDE and + * remote when executed through ./bin/flink). + * @return The Flink {@link JobExecutionResult} after the execution of the Storm topology. + * @throws Exception which occurs during execution of the translated Storm topology. */ - public void increaseNumberOfTasks(final int dop) { - assert (dop > 0); - this.numberOfTasks += dop; + public JobExecutionResult execute() throws Exception { + return env.execute(); + } + + + @SuppressWarnings("unchecked") + private <T> Map<String, T> getPrivateField(String field) { + try { + Field f = builder.getClass().getDeclaredField(field); + f.setAccessible(true); + return copyObject((Map<String, T>) f.get(builder)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Couldn't get " + field + " from TopologyBuilder", e); + } + } + + private <T> T copyObject(T object) { + try { + return InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(object), + getClass().getClassLoader() + ); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Failed to copy object."); + } } /** - * Return the number or required tasks to execute this program. - * - * @return the number or required tasks to execute this program + * Creates a Flink program that uses the specified spouts and bolts. */ - public int getNumberOfTasks() { - return this.numberOfTasks; + private void translateTopology() { + + unprocessdInputsPerBolt.clear(); + outputStreams.clear(); + declarers.clear(); + availableInputs.clear(); + + // Storm defaults to parallelism 1 + env.setParallelism(1); + + /* Translation of topology */ + + + for (final Entry<String, IRichSpout> spout : spouts.entrySet()) { + final String spoutId = spout.getKey(); + final IRichSpout userSpout = spout.getValue(); + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + userSpout.declareOutputFields(declarer); + final HashMap<String,Fields> sourceStreams = declarer.outputStreams; + this.outputStreams.put(spoutId, sourceStreams); + declarers.put(spoutId, declarer); + + + final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); + final DataStreamSource<?> source; + + if (sourceStreams.size() == 1) { + final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null); + spoutWrapperSingleOutput.setStormTopology(stormTopology); + + final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; + + DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId, + declarer.getOutputType(outputStreamId)); + + outputStreams.put(outputStreamId, src); + source = src; + } else { + final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>( + userSpout, spoutId, null, null); + spoutWrapperMultipleOutputs.setStormTopology(stormTopology); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource( + spoutWrapperMultipleOutputs, spoutId, + (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class)); + + SplitStream<SplitStreamType<Tuple>> splitSource = multiSource + .split(new StormStreamSelector<Tuple>()); + for (String streamId : sourceStreams.keySet()) { + SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId) + .map(new SplitStreamMapper<Tuple>()); + outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); + outputStreams.put(streamId, outStream); + } + source = multiSource; + } + availableInputs.put(spoutId, outputStreams); + + final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common(); + if (common.is_set_parallelism_hint()) { + int dop = common.get_parallelism_hint(); + source.setParallelism(dop); + } else { + common.set_parallelism_hint(1); + } + } + + /** + * 1. Connect all spout streams with bolts streams + * 2. Then proceed with the bolts stream already connected + * + * Because we do not know the order in which an iterator steps over a set, we might process a consumer before + * its producer + * ->thus, we might need to repeat multiple times + */ + boolean makeProgress = true; + while (bolts.size() > 0) { + if (!makeProgress) { + throw new RuntimeException( + "Unable to build Topology. Could not connect the following bolts: " + + bolts.keySet()); + } + makeProgress = false; + + final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator(); + while (boltsIterator.hasNext()) { + + final Entry<String, IRichBolt> bolt = boltsIterator.next(); + final String boltId = bolt.getKey(); + final IRichBolt userBolt = copyObject(bolt.getValue()); + + final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common(); + + Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId); + if (unprocessedBoltInputs == null) { + unprocessedBoltInputs = new HashSet<>(); + unprocessedBoltInputs.addAll(common.get_inputs().entrySet()); + unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs); + } + + // check if all inputs are available + final int numberOfInputs = unprocessedBoltInputs.size(); + int inputsAvailable = 0; + for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) { + final String producerId = entry.getKey().get_componentId(); + final String streamId = entry.getKey().get_streamId(); + final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId); + if (streams != null && streams.get(streamId) != null) { + inputsAvailable++; + } + } + + if (inputsAvailable != numberOfInputs) { + // traverse other bolts first until inputs are available + continue; + } else { + makeProgress = true; + boltsIterator.remove(); + } + + final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs); + + for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) { + final GlobalStreamId streamId = input.getKey(); + final Grouping grouping = input.getValue(); + + final String producerId = streamId.get_componentId(); + + final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId); + + inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); + } + + final Iterator<Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator(); + + final Entry<GlobalStreamId, DataStream<Tuple>> firstInput = iterator.next(); + GlobalStreamId streamId = firstInput.getKey(); + DataStream<Tuple> inputStream = firstInput.getValue(); + + final SingleOutputStreamOperator<?, ?> outputStream; + + switch (numberOfInputs) { + case 1: + outputStream = createOutput(boltId, userBolt, streamId, inputStream); + break; + case 2: + Entry<GlobalStreamId, DataStream<Tuple>> secondInput = iterator.next(); + GlobalStreamId streamId2 = secondInput.getKey(); + DataStream<Tuple> inputStream2 = secondInput.getValue(); + outputStream = createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2); + break; + default: + throw new UnsupportedOperationException("Don't know how to translate a bolt " + + boltId + " with " + numberOfInputs + " inputs."); + } + + if (common.is_set_parallelism_hint()) { + int dop = common.get_parallelism_hint(); + outputStream.setParallelism(dop); + } else { + common.set_parallelism_hint(1); + } + + } + } } + private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt, + GlobalStreamId streamId, Grouping grouping, + Map<String, DataStream<Tuple>> producer) { + + assert (userBolt != null); + assert(boltId != null); + assert(streamId != null); + assert(grouping != null); + assert(producer != null); + + final String producerId = streamId.get_componentId(); + final String inputStreamId = streamId.get_streamId(); + + DataStream<Tuple> inputStream = producer.get(inputStreamId); + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarers.put(boltId, declarer); + userBolt.declareOutputFields(declarer); + this.outputStreams.put(boltId, declarer.outputStreams); + + // if producer was processed already + if (grouping.is_set_shuffle()) { + // Storm uses a round-robin shuffle strategy + inputStream = inputStream.rebalance(); + } else if (grouping.is_set_fields()) { + // global grouping is emulated in Storm via an empty fields grouping list + final List<String> fields = grouping.get_fields(); + if (fields.size() > 0) { + FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId); + inputStream = inputStream.keyBy(prodDeclarer + .getGroupingFieldIndexes(inputStreamId, + grouping.get_fields())); + } else { + inputStream = inputStream.global(); + } + } else if (grouping.is_set_all()) { + inputStream = inputStream.broadcast(); + } else if (!grouping.is_set_local_or_shuffle()) { + throw new UnsupportedOperationException( + "Flink only supports (local-or-)shuffle, fields, all, and global grouping"); + } + + return inputStream; + } + + private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt, GlobalStreamId streamId, DataStream<Tuple> inputStream) { + return createOutput(boltId, bolt, streamId, inputStream, null, null); + } + + private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt, + GlobalStreamId streamId, DataStream<Tuple> inputStream, + GlobalStreamId streamId2, DataStream<Tuple> inputStream2) { + assert(boltId != null); + assert(streamId != null); + assert(inputStream != null); + Preconditions.checkArgument((streamId2 == null) == (inputStream2 == null)); + + String producerId = streamId.get_componentId(); + String inputStreamId = streamId.get_streamId(); + + final HashMap<String, Fields> boltOutputs = this.outputStreams.get(boltId); + + final FlinkOutputFieldsDeclarer declarer = declarers.get(boltId); + + final SingleOutputStreamOperator<?, ?> outputStream; + + if (boltOutputs.size() < 2) { // single output stream or sink + String outputStreamId; + if (boltOutputs.size() == 1) { + outputStreamId = (String) boltOutputs.keySet().toArray()[0]; + } else { + outputStreamId = null; + } + + final TypeInformation<Tuple> outType = declarer + .getOutputType(outputStreamId); + + final SingleOutputStreamOperator<Tuple, ?> outStream; + + // only one input + if (streamId2 == null) { + BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>( + bolt, boltId, producerId, inputStreamId, + this.outputStreams.get(producerId).get(inputStreamId), null); + boltWrapper.setStormTopology(stormTopology); + + + outStream = inputStream.transform(boltId, outType, boltWrapper); + + } else { + String producerId2 = streamId2.get_componentId(); + String inputStreamId2 = streamId2.get_streamId(); + + final BoltWrapperTwoInput<Tuple, Tuple, Tuple> boltWrapper = new BoltWrapperTwoInput<>( + bolt, boltId, + inputStreamId, inputStreamId2, producerId, producerId2, + this.outputStreams.get(producerId).get(inputStreamId), + this.outputStreams.get(producerId2).get(inputStreamId2) + ); + boltWrapper.setStormTopology(stormTopology); + + outStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper); + } + + if (outType != null) { + // only for non-sink nodes + final HashMap<String, DataStream<Tuple>> op = new HashMap<>(); + op.put(outputStreamId, outStream); + availableInputs.put(boltId, op); + } + outputStream = outStream; + } else { + + @SuppressWarnings({ "unchecked", "rawtypes" }) + final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor + .getForClass(SplitStreamType.class); + + + final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream; + + // only one input + if (streamId2 == null) { + final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>( + bolt, boltId, inputStreamId, producerId, this.outputStreams.get(producerId).get(inputStreamId), + null + ); + boltWrapperMultipleOutputs.setStormTopology(stormTopology); + + multiStream = inputStream.transform(boltId, outType, boltWrapperMultipleOutputs); + } else { + String producerId2 = streamId2.get_componentId(); + String inputStreamId2 = streamId2.get_streamId(); + + final BoltWrapperTwoInput<Tuple, Tuple, SplitStreamType<Tuple>> boltWrapper = new BoltWrapperTwoInput<>( + bolt, boltId, + inputStreamId, inputStreamId2, producerId, producerId2, + this.outputStreams.get(producerId).get(inputStreamId), + this.outputStreams.get(producerId2).get(inputStreamId2) + ); + boltWrapper.setStormTopology(stormTopology); + + multiStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper); + } + + final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream + .split(new StormStreamSelector<Tuple>()); + + final HashMap<String, DataStream<Tuple>> op = new HashMap<>(); + for (String outputStreamId : boltOutputs.keySet()) { + op.put(outputStreamId, + splitStream.select(outputStreamId).map( + new SplitStreamMapper<Tuple>())); + SingleOutputStreamOperator<Tuple, ?> outStream = splitStream + .select(outputStreamId).map(new SplitStreamMapper<Tuple>()); + outStream.getTransformation().setOutputType(declarer.getOutputType(outputStreamId)); + op.put(outputStreamId, outStream); + } + availableInputs.put(boltId, op); + outputStream = multiStream; + } + + return outputStream; + } + + // for internal testing purpose only + public StormTopology getStormTopology() { + return this.stormTopology; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java deleted file mode 100644 index 42e1d68..0000000 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java +++ /dev/null @@ -1,421 +0,0 @@ -/* -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.storm.api; - -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BasicBoltExecutor; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.IRichStateSpout; -import backtype.storm.topology.SpoutDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.storm.util.SplitStreamMapper; -import org.apache.flink.storm.util.SplitStreamType; -import org.apache.flink.storm.util.StormStreamSelector; -import org.apache.flink.storm.wrappers.BoltWrapper; -import org.apache.flink.storm.wrappers.SpoutWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitStream; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; - -/** - * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm - * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder} - * implementation to ensure equal behavior.<br> - * <br> - * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong> - */ -public class FlinkTopologyBuilder { - - /** A Storm {@link TopologyBuilder} to build a real Storm topology */ - private final TopologyBuilder stormBuilder = new TopologyBuilder(); - /** All user spouts by their ID */ - private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>(); - /** All user bolts by their ID */ - private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>(); - /** All declared streams and output schemas by operator ID */ - private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>(); - /** All spouts&bolts declarers by their ID */ - private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>(); - // needs to be a class member for internal testing purpose - private StormTopology stormTopology; - - - /** - * Creates a Flink program that uses the specified spouts and bolts. - */ - public FlinkTopology createTopology() { - this.stormTopology = this.stormBuilder.createTopology(); - - final FlinkTopology env = new FlinkTopology(); - env.setParallelism(1); - - final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>(); - - for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) { - final String spoutId = spout.getKey(); - final IRichSpout userSpout = spout.getValue(); - - final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); - userSpout.declareOutputFields(declarer); - final HashMap<String,Fields> sourceStreams = declarer.outputStreams; - this.outputStreams.put(spoutId, sourceStreams); - declarers.put(spoutId, declarer); - - - final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); - final DataStreamSource<?> source; - - if (sourceStreams.size() == 1) { - final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null); - spoutWrapperSingleOutput.setStormTopology(stormTopology); - - final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; - - DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId, - declarer.getOutputType(outputStreamId)); - - outputStreams.put(outputStreamId, src); - source = src; - } else { - final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>( - userSpout, spoutId, null, null); - spoutWrapperMultipleOutputs.setStormTopology(stormTopology); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource( - spoutWrapperMultipleOutputs, spoutId, - (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class)); - - SplitStream<SplitStreamType<Tuple>> splitSource = multiSource - .split(new StormStreamSelector<Tuple>()); - for (String streamId : sourceStreams.keySet()) { - SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId) - .map(new SplitStreamMapper<Tuple>()); - outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); - outputStreams.put(streamId, outStream); - } - source = multiSource; - } - availableInputs.put(spoutId, outputStreams); - - int dop = 1; - final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common(); - if (common.is_set_parallelism_hint()) { - dop = common.get_parallelism_hint(); - source.setParallelism(dop); - } else { - common.set_parallelism_hint(1); - } - env.increaseNumberOfTasks(dop); - } - - final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>(); - unprocessedBolts.putAll(this.bolts); - - final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = - new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>(); - - /* Because we do not know the order in which an iterator steps over a set, we might process a consumer before - * its producer - * ->thus, we might need to repeat multiple times - */ - boolean makeProgress = true; - while (unprocessedBolts.size() > 0) { - if (!makeProgress) { - throw new RuntimeException( - "Unable to build Topology. Could not connect the following bolts: " - + unprocessedBolts.keySet()); - } - makeProgress = false; - - final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator(); - while (boltsIterator.hasNext()) { - - final Entry<String, IRichBolt> bolt = boltsIterator.next(); - final String boltId = bolt.getKey(); - final IRichBolt userBolt = bolt.getValue(); - - final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common(); - - Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId); - if (unprocessedInputs == null) { - unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>(); - unprocessedInputs.addAll(common.get_inputs().entrySet()); - unprocessdInputsPerBolt.put(boltId, unprocessedInputs); - } - - // connect each available producer to the current bolt - final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator(); - while (inputStreamsIterator.hasNext()) { - - final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next(); - final String producerId = stormInputStream.getKey().get_componentId(); - final String inputStreamId = stormInputStream.getKey().get_streamId(); - - final HashMap<String, DataStream<Tuple>> producer = availableInputs.get(producerId); - if (producer != null) { - makeProgress = true; - - DataStream<Tuple> inputStream = producer.get(inputStreamId); - if (inputStream != null) { - final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); - userBolt.declareOutputFields(declarer); - final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams; - this.outputStreams.put(boltId, boltOutputStreams); - this.declarers.put(boltId, declarer); - - // if producer was processed already - final Grouping grouping = stormInputStream.getValue(); - if (grouping.is_set_shuffle()) { - // Storm uses a round-robin shuffle strategy - inputStream = inputStream.rebalance(); - } else if (grouping.is_set_fields()) { - // global grouping is emulated in Storm via an empty fields grouping list - final List<String> fields = grouping.get_fields(); - if (fields.size() > 0) { - FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId); - inputStream = inputStream.keyBy(prodDeclarer - .getGroupingFieldIndexes(inputStreamId, - grouping.get_fields())); - } else { - inputStream = inputStream.global(); - } - } else if (grouping.is_set_all()) { - inputStream = inputStream.broadcast(); - } else if (!grouping.is_set_local_or_shuffle()) { - throw new UnsupportedOperationException( - "Flink only supports (local-or-)shuffle, fields, all, and global grouping"); - } - - final SingleOutputStreamOperator<?, ?> outputStream; - - if (boltOutputStreams.size() < 2) { // single output stream or sink - String outputStreamId = null; - if (boltOutputStreams.size() == 1) { - outputStreamId = (String) boltOutputStreams.keySet().toArray()[0]; - } - final TypeInformation<Tuple> outType = declarer - .getOutputType(outputStreamId); - - final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>( - userBolt, boltId, this.outputStreams.get(producerId).get( - inputStreamId), null); - boltWrapperSingleOutput.setStormTopology(stormTopology); - - final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream - .transform(boltId, outType, boltWrapperSingleOutput); - - if (outType != null) { - // only for non-sink nodes - final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>(); - op.put(outputStreamId, outStream); - availableInputs.put(boltId, op); - } - outputStream = outStream; - } else { - final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>( - userBolt, boltId, this.outputStreams.get(producerId).get( - inputStreamId), null); - boltWrapperMultipleOutputs.setStormTopology(stormTopology); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor - .getForClass(SplitStreamType.class); - - final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream - .transform(boltId, outType, boltWrapperMultipleOutputs); - - final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream - .split(new StormStreamSelector<Tuple>()); - - final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>(); - for (String outputStreamId : boltOutputStreams.keySet()) { - SingleOutputStreamOperator<Tuple, ?> outStream = splitStream - .select(outputStreamId).map( - new SplitStreamMapper<Tuple>()); - outStream.getTransformation().setOutputType( - declarer.getOutputType(outputStreamId)); - op.put(outputStreamId, outStream); - } - availableInputs.put(boltId, op); - outputStream = multiStream; - } - - int dop = 1; - if (common.is_set_parallelism_hint()) { - dop = common.get_parallelism_hint(); - outputStream.setParallelism(dop); - } else { - common.set_parallelism_hint(1); - } - env.increaseNumberOfTasks(dop); - - inputStreamsIterator.remove(); - } else { - throw new RuntimeException("Cannot connect '" + boltId + "' to '" - + producerId + "'. Stream '" + inputStreamId + "' not found."); - } - } - } - - if (unprocessedInputs.size() == 0) { - // all inputs are connected; processing bolt completed - boltsIterator.remove(); - } - } - } - return env; - } - - /** - * Define a new bolt in this topology with parallelism of just one thread. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this bolt's - * outputs. - * @param bolt - * the bolt - * @return use the returned object to declare the inputs to this component - */ - public BoltDeclarer setBolt(final String id, final IRichBolt bolt) { - return this.setBolt(id, bolt, null); - } - - /** - * Define a new bolt in this topology with the specified amount of parallelism. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this bolt's - * outputs. - * @param bolt - * the bolt - * @param parallelism_hint - * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a - * process somewhere around the cluster. - * @return use the returned object to declare the inputs to this component - */ - public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) { - final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint); - this.bolts.put(id, bolt); - return declarer; - } - - /** - * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted - * kind - * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to - * achieve proper reliability in the topology. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this bolt's - * outputs. - * @param bolt - * the basic bolt - * @return use the returned object to declare the inputs to this component - */ - public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) { - return this.setBolt(id, bolt, null); - } - - /** - * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted - * kind - * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to - * achieve proper reliability in the topology. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this bolt's - * outputs. - * @param bolt - * the basic bolt - * @param parallelism_hint - * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a - * process somewhere around the cluster. - * @return use the returned object to declare the inputs to this component - */ - public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) { - return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint); - } - - /** - * Define a new spout in this topology. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this spout's - * outputs. - * @param spout - * the spout - */ - public SpoutDeclarer setSpout(final String id, final IRichSpout spout) { - return this.setSpout(id, spout, null); - } - - /** - * Define a new spout in this topology with the specified parallelism. If the spout declares itself as - * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component. - * - * @param id - * the id of this component. This id is referenced by other components that want to consume this spout's - * outputs. - * @param parallelism_hint - * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a - * process somewhere around the cluster. - * @param spout - * the spout - */ - public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) { - final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint); - this.spouts.put(id, spout); - return declarer; - } - - // TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself) - /* not implemented by Storm 0.9.4 - * public void setStateSpout(final String id, final IRichStateSpout stateSpout) { - * this.stormBuilder.setStateSpout(id, stateSpout); - * } - * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) { - * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint); - * } - */ - - // for internal testing purpose only - StormTopology getStormTopology() { - return this.stormTopology; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java index 6550990..38ce58c 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java @@ -17,15 +17,14 @@ */ package org.apache.flink.storm.util; +import backtype.storm.Config; +import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; + import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; - -import backtype.storm.Config; - /** * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config} * object) for embedded Spouts and Bolts. http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java index d9f4178..6072e0f 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java @@ -17,15 +17,15 @@ */ package org.apache.flink.storm.util; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import org.apache.flink.storm.api.FlinkTopologyBuilder; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; - /** - * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink. + * Used by {@link FlinkTopology} to split multiple declared output streams within Flink. */ public final class StormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> { private static final long serialVersionUID = 2553423379715401023L; http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index ee06f0a..a0115f3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -16,15 +16,13 @@ */ package org.apache.flink.storm.wrappers; -import java.util.Collection; -import java.util.HashMap; - import backtype.storm.generated.StormTopology; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.tuple.Fields; - +import backtype.storm.utils.Utils; +import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; @@ -36,7 +34,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; /** * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming @@ -53,21 +52,33 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements private static final long serialVersionUID = -4788589118464155835L; /** The wrapped Storm {@link IRichBolt bolt}. */ - private final IRichBolt bolt; + protected final IRichBolt bolt; /** The name of the bolt. */ private final String name; /** Number of attributes of the bolt's output tuples per stream. */ - private final HashMap<String, Integer> numberOfAttributes; + protected final HashMap<String, Integer> numberOfAttributes; /** The schema (ie, ordered field names) of the input stream. */ - private final Fields inputSchema; + protected final Fields inputSchema; /** The original Storm topology. */ protected StormTopology stormTopology; + /** The topology context of the bolt */ + protected transient TopologyContext topologyContext; + + /** The component id of the input stream for this bolt */ + protected final String inputComponentId; + + /** The stream id of the input stream for this bolt */ + protected final String inputStreamId; + + public final static String DEFAULT_OPERATOR_ID = "defaultID"; + public final static String DEFUALT_BOLT_NAME = "defaultBoltName"; + /** * We have to use this because Operators must output * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}. */ - private transient TimestampedCollector<OUT> flinkCollector; + protected transient TimestampedCollector<OUT> flinkCollector; /** * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be @@ -75,8 +86,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's * declared number of attributes. * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * @param bolt The Storm {@link IRichBolt bolt} to be used. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. */ @@ -89,11 +99,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on * the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param inputSchema - * The schema (ie, ordered field names) of the input stream. + * @param bolt The Storm {@link IRichBolt bolt} to be used. + * @param inputSchema The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException + * * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. */ @@ -108,16 +116,13 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * @param bolt The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be * of a raw type. * @throws IllegalArgumentException * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * {@code rawOuput} is {@code false} and the number of declared output attributes is not within range [1;25]. */ public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) throws IllegalArgumentException { @@ -131,8 +136,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * @param bolt The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be * of a raw type. @@ -153,8 +157,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * @param bolt The Storm {@link IRichBolt bolt} to be used. * @param inputSchema * The schema (ie, ordered field names) of the input stream. * @param rawOutputs @@ -166,7 +169,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * [0;25]. */ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, - final String[] rawOutputs) throws IllegalArgumentException { + final String[] rawOutputs) throws IllegalArgumentException { this(bolt, inputSchema, Sets.newHashSet(rawOutputs)); } @@ -176,11 +179,11 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * @param bolt The Storm {@link IRichBolt bolt} to be used. * @param inputSchema - * The schema (ie, ordered field names) of the input stream. + * The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * @param rawOutputs * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be * of a raw type. @@ -190,8 +193,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * [0;25]. */ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, - final Collection<String> rawOutputs) throws IllegalArgumentException { - this(bolt, null, inputSchema, rawOutputs); + final Collection<String> rawOutputs) throws IllegalArgumentException { + this(bolt, DEFUALT_BOLT_NAME, Utils.DEFAULT_STREAM_ID, DEFAULT_OPERATOR_ID, inputSchema, rawOutputs); } /** @@ -201,10 +204,10 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param name - * The name of the bolt. + * @param bolt The Storm {@link IRichBolt bolt} to be used. + * @param name The name of the bolt. + * @param inputStreamId The stream id of the input stream for this bolt + * @param inputComponentId The component id of the input stream for this bolt * @param inputSchema * The schema (ie, ordered field names) of the input stream. * @param rawOutputs @@ -215,10 +218,13 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * [0;25]. */ - public BoltWrapper(final IRichBolt bolt, final String name, final Fields inputSchema, - final Collection<String> rawOutputs) throws IllegalArgumentException { + public BoltWrapper(final IRichBolt bolt, final String name, + final String inputStreamId, final String inputComponentId, + final Fields inputSchema, final Collection<String> rawOutputs) throws IllegalArgumentException { this.bolt = bolt; this.name = name; + this.inputComponentId = inputComponentId; + this.inputStreamId = inputStreamId; this.inputSchema = inputSchema; this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs); } @@ -237,7 +243,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements public void open() throws Exception { super.open(); - this.flinkCollector = new TimestampedCollector<OUT>(output); + this.flinkCollector = new TimestampedCollector<>(output); final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>( this.numberOfAttributes, flinkCollector)); @@ -252,9 +258,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements } } - final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext( + topologyContext = WrapperSetupHelper.createTopologyContext( getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig); - this.bolt.prepare(stormConfig, topologyContext, stormCollector); } @@ -267,7 +272,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements public void processElement(final StreamRecord<IN> element) throws Exception { this.flinkCollector.setTimestamp(element.getTimestamp()); IN value = element.getValue(); - this.bolt.execute(new StormTuple<IN>(value, inputSchema)); + this.bolt.execute(new StormTuple<>(value, inputSchema, topologyContext.getThisTaskId(), inputStreamId, inputComponentId)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java new file mode 100644 index 0000000..02ffa51 --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.tuple.Fields; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; + +/** + * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming + * program. In contrast to {@link BoltWrapper}, this wrapper takes two input stream as input. + */ +public class BoltWrapperTwoInput<IN1, IN2, OUT> extends BoltWrapper<IN1, OUT> implements TwoInputStreamOperator<IN1, IN2, OUT> { + + /** The schema (ie, ordered field names) of the second input stream. */ + private final Fields inputSchema2; + + /** The component id of the second input stream of the bolt */ + private final String componentId2; + /** The stream id of the second input stream of the bolt */ + private final String streamId2; + + /** + * Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} + * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will + * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * @param bolt The Storm {@link IRichBolt bolt} to be used. + * @param boltId The name of the bolt. + * @param streamId1 The stream id of the second input stream for this bolt + * @param componentId2 The component id of the second input stream for this bolt + * @param inputSchema1 + * The schema (ie, ordered field names) of the input stream. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * */ + public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId, + final String streamId1, final String streamId2, + final String componentId1, final String componentId2, + final Fields inputSchema1, final Fields inputSchema2) throws IllegalArgumentException { + this(bolt, boltId, streamId1, streamId2, componentId1, componentId2, inputSchema1, inputSchema2, null); + } + + /** + * Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} + * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will + * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * @param bolt The Storm {@link IRichBolt bolt} to be used. + * @param boltId The name of the bolt. + * @param streamId1 The stream id of the first input stream for this bolt + * @param streamId2 The stream id of the first input stream for this bolt + * @param componentId1 The component id of the first input stream for this bolt + * @param componentId2 The component id of the second input stream for this bolt + * @param inputSchema1 + * The schema (ie, ordered field names) of the first input stream. + * @param inputSchema2 + * The schema (ie, ordered field names) of the second input stream. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + */ + public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId, + final String streamId1, final String streamId2, + final String componentId1, final String componentId2, + final Fields inputSchema1, final Fields inputSchema2, + final Collection<String> rawOutputs) throws IllegalArgumentException { + super(bolt, boltId, streamId1, componentId1, inputSchema1, rawOutputs); + this.componentId2 = componentId2; + this.streamId2 = streamId2; + this.inputSchema2 = inputSchema2; + } + + /** + * Sets the original Storm topology. + * + * @param stormTopology + * The original Storm topology. + */ + public void setStormTopology(StormTopology stormTopology) { + this.stormTopology = stormTopology; + } + + + @Override + public void processElement1(final StreamRecord<IN1> element) throws Exception { + super.processElement(element); + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + this.flinkCollector.setTimestamp(element.getTimestamp()); + IN2 value = element.getValue(); + this.bolt.execute(new StormTuple<>(value, inputSchema2, topologyContext.getThisTaskId(), streamId2, componentId2)); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + super.processWatermark(mark); + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + this.output.emitWatermark(mark); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java index 68368bf..db1d147 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -27,13 +27,12 @@ import backtype.storm.metric.api.ReducedMetric; import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; import java.util.Collection; import java.util.List; import java.util.Map; -import clojure.lang.Atom; - /** * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when * a Storm topology is executed within Flink.