[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1398#issuecomment-161412796 Thanks for you patience. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1398#issuecomment-161304907 Thanks for your feedback! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1398 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1398#issuecomment-160985940 If Travis is green please merge. You can fix the last tiny comments directly before merging. No need to update this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46284171 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -108,16 +114,13 @@ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema) * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. -* -* @param bolt -*The Storm {@link IRichBolt bolt} to be used. +* @param bolt The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs *Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be *of a raw type. * @throws IllegalArgumentException * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range -* [1;25]. --- End diff -- Please keep and update: should be `[0;25]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46284474 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +47,32 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + /** The task id where this tuple is processed */ + private final int taskId; + /** The producer of this tuple */ + private final String producerStreamId; + /** The producer's component id of this tuple */ + private final String producerComponentId; + /*+ The message that is associated with this tuple */ --- End diff -- `/**` not `+` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46284023 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -89,13 +99,9 @@ public BoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on * the bolt's declared number of attributes. -* -* @param bolt -*The Storm {@link IRichBolt bolt} to be used. +* @param bolt The Storm {@link IRichBolt bolt} to be used. * @param inputSchema -*The schema (ie, ordered field names) of the input stream. -* @throws IllegalArgumentException --- End diff -- Please keep `@throws` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1398#issuecomment-160982088 I've addressed your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46281582 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java --- @@ -265,12 +264,12 @@ public void testOpen() throws Exception { @Test public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - BoltWrapper wrapper = new BoltWrapper(bolt); + BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component"); wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); - verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); --- End diff -- I see. Makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46279370 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java --- @@ -72,7 +71,7 @@ public static void main(final String[] args) throws Exception { .transform("StormBoltTokenizer", TypeExtractor.getForObject(""), new BoltWrapper(new ExclamationBolt(), - new String[] { Utils.DEFAULT_STREAM_ID })) + "stream", "component", new String[] { Utils.DEFAULT_STREAM_ID })) --- End diff -- Can do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46278994 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -193,24 +189,22 @@ public void testCreateTopologyContext() { Utils.sleep(++counter * 1); cluster.shutdown(); - if (TestSink.result.size() == 8) { + if (TestSink.result.size() >= 4) { --- End diff -- The Storm executor sometimes returned more results for me. I've adjusted it to a fixed size again. I think the important thing here is that we check all the returned results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46277213 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java --- @@ -265,12 +264,12 @@ public void testOpen() throws Exception { @Test public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - BoltWrapper wrapper = new BoltWrapper(bolt); + BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component"); wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); - verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); --- End diff -- `any` matches also null (literally anything) but here I want to explicitly check `isNotNull`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46276898 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java --- @@ -224,7 +224,7 @@ static synchronized TopologyContext createTopologyContext( *OUTPUT: A map from all component IDs to there output streams and output fields. * * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current -* Flink operator ({@link operatorName}) -- {@code null} otherwise. +* Flink operator ({@param operatorName}) -- {@code null} otherwise. --- End diff -- Ok, let's use code. http://stackoverflow.com/questions/1667212/reference-a-method-parameter-in-javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46035298 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java --- @@ -224,7 +224,7 @@ static synchronized TopologyContext createTopologyContext( *OUTPUT: A map from all component IDs to there output streams and output fields. * * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current -* Flink operator ({@link operatorName}) -- {@code null} otherwise. +* Flink operator ({@param operatorName}) -- {@code null} otherwise. --- End diff -- I guess `@link` is wrong, but does `@param` work? Maybe `@code` would be correct? But I am not sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46035118 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java --- @@ -265,12 +264,12 @@ public void testOpen() throws Exception { @Test public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - BoltWrapper wrapper = new BoltWrapper(bolt); + BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component"); wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); - verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); --- End diff -- Just out of curiosity: Why `isNotNull` instead of `any`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46034951 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -193,24 +189,22 @@ public void testCreateTopologyContext() { Utils.sleep(++counter * 1); cluster.shutdown(); - if (TestSink.result.size() == 8) { + if (TestSink.result.size() >= 4) { --- End diff -- Why `>=` and not `==` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46033750 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java --- @@ -64,7 +63,7 @@ public static void main(final String[] args) throws Exception { // this is done by a bolt that is wrapped accordingly .transform("BoltTokenizer", TypeExtractor.getForObject(new Tuple2("", 0)), - new BoltWrapper>(new BoltTokenizer())) + new BoltWrapper>(new BoltTokenizer(), "stream", "component")) --- End diff -- Same question here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46033770 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java --- @@ -71,7 +70,7 @@ public static void main(final String[] args) throws Exception { // this is done by a bolt that is wrapped accordingly .transform("BoltTokenizerPojo", TypeExtractor.getForObject(new Tuple2("", 0)), - new BoltWrapper>(new BoltTokenizerByName())) + new BoltWrapper>(new BoltTokenizerByName(), "stream", "component")) --- End diff -- Again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46033787 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java --- @@ -75,7 +74,7 @@ public static void main(final String[] args) throws Exception { "BoltTokenizerWithNames", TypeExtractor.getForObject(new Tuple2("", 0)), new BoltWrapper, Tuple2>( - new BoltTokenizerByName(), new Fields("sentence"))) + new BoltTokenizerByName(), "stream", "component", new Fields("sentence"))) --- End diff -- Again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46033705 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java --- @@ -70,7 +70,7 @@ public static void main(final String[] args) throws Exception { oddStream.transform("oddBolt", TypeExtractor.getForObject(new Tuple2("", 0)), new BoltWrapper, Tuple2>( - new VerifyAndEnrichBolt(false))) + new VerifyAndEnrichBolt(false), "stream", "component")) --- End diff -- Default values for both? Or use `ODD_STREAM` for consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r46033582 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java --- @@ -72,7 +71,7 @@ public static void main(final String[] args) throws Exception { .transform("StormBoltTokenizer", TypeExtractor.getForObject(""), new BoltWrapper(new ExclamationBolt(), - new String[] { Utils.DEFAULT_STREAM_ID })) + "stream", "component", new String[] { Utils.DEFAULT_STREAM_ID })) --- End diff -- Can't we use default values for streamID and componentID here, ie, obit both parameters? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971856 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if the bolt didn't define any output fields. That led to a NullPointerException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971476 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Fair enough, I use default id constants now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45970656 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Ok. I guess it would make sense to use Storm's `Utils.DEFAULT_STREAM_ID` here? And maybe add a `public final static String DEFAULT_OPERATOR_ID` variable to `StormTuple`? What about using "defaultID" or "unspecified" instead of "componentID" or similar? Just to make it clear if the name shows up in the UI? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45970389 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- Good we are on the same page. And I don't want to bully you! I just mentioned the classes that do not contain any actual code change -- actually, according to the coding guidelines -- there should be no import-order changes even in the classes with code changes -- I did not comment on them -- just on the classes with pure reformatting. I like consistency so please apply the changes to all classes. But I did import-reorderings or making code formatting consistent (if it was inconsistent) and was always told "don't do this". So if it is a general rule, I just point it out here, too. I did not come up with the rule. And I never force my own code style -- a always adapt to the given style. :) It's is really about time to get a proper maven formatting tool running to get rid of all this stupid discussions. (And a said already: "It is not against you or the change itself" -- but the process seems to be inconsistent -- people follow the rules more or less strictly) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45967336 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- Not sure who is bullying whom :) Look at the classes and you will see that all imports are arranged like this. We want to be consistent, right? According to your suggestion, I changed the other import statements which were just reformatting. Open source is often about compromises. Very rarely you will find that the code style of a person reflects exactly how you would do it. I'm making compromises and changing things as you like them. That's fine for me. Please don't give me a harder time by blaming my employer. I'm not aware I have done something like this to you. Next time you get blamed for something like this, please contact me and I'll try to help you. I don't think this is the right place to sort out things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45966621 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java --- @@ -20,11 +20,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965523 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- I see. DataArtians committer can do any change, but external committers get bullied if they apply similar changes... It is not against you or the change itself -- it unifies the style which does make sense. But I got bullied multiple times in other PRs when I did similar stuff... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965316 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- I cannot see why it did not work before? Can you explain what the problem was? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964853 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import java.util.Map; + public class TestDummyBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964876 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java --- @@ -16,16 +16,16 @@ */ package org.apache.flink.storm.util; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class TestSink implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964832 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java --- @@ -16,13 +16,13 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; +import java.util.Map; + public class TestSpout implements IRichSpout { private static final long serialVersionUID = -4884029383198924007L; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964866 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; +import java.util.Map; + public class TestDummySpout implements IRichSpout { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964904 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java --- @@ -21,7 +21,6 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; - import org.apache.flink.api.common.ExecutionConfig; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964776 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java --- @@ -16,14 +16,14 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.Map; + public class TestBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964692 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + TopologyBuilder builder = new TopologyBuilder(); - Assert.assertEquals(0, topology.getNumberOfTasks()); + builder.setSpout("spout", new TestDummySpout()); + builder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); - topology.increaseNumberOfTasks(3); - Assert.assertEquals(3, topology.getNumberOfTasks()); + FlinkTopology.createTopology(builder); + } - topology.increaseNumberOfTasks(2); - Assert.assertEquals(5, topology.getNumberOfTasks()); + @Test + @Ignore --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964610 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964241 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java --- @@ -18,9 +18,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964023 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java --- @@ -17,12 +17,12 @@ package org.apache.flink.storm.wrappers; -import java.util.HashMap; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963943 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java --- @@ -33,7 +30,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963933 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- The use of null is often problematic. I prefer default values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963921 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45896492 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. --- End diff -- Sorry for b
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45895556 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. */ - @Override - pu
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45894573 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java --- @@ -613,29 +611,29 @@ public void testGetBinaryByFieldPojoGetter() throws Exception { return new StormTuple(tuple, schema); } - @Test(expected = UnsupportedOperationException.class) + @Test public void testGetSourceGlobalStreamid() { - new StormTuple(null, null).getSourceGlobalStreamid(); + Assert.assertNotNull(new StormTuple(null, null).getSourceGlobalStreamid()); --- End diff -- Can we improve on all this tests? Just a check for "not-null" seems a little limited to me. We should rather check for defaults values and also check the full constructor case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45894174 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Are this meaningful/helpful defaults? Why not just set it to `null`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893951 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java --- @@ -21,7 +21,6 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; - import org.apache.flink.api.common.ExecutionConfig; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893875 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java --- @@ -16,16 +16,16 @@ */ package org.apache.flink.storm.util; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class TestSink implements IRichBolt { --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893806 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java --- @@ -16,13 +16,13 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; +import java.util.Map; + public class TestSpout implements IRichSpout { private static final long serialVersionUID = -4884029383198924007L; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893834 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import java.util.Map; + public class TestDummyBolt implements IRichBolt { --- End diff -- pure reformatiing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893853 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; +import java.util.Map; + public class TestDummySpout implements IRichSpout { --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893736 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore --- End diff -- Please enable this test. I forgot to do this in my last commit which fixes this issue... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893790 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java --- @@ -16,14 +16,14 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.Map; + public class TestBolt implements IRichBolt { --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45893753 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + TopologyBuilder builder = new TopologyBuilder(); - Assert.assertEquals(0, topology.getNumberOfTasks()); + builder.setSpout("spout", new TestDummySpout()); + builder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); - topology.increaseNumberOfTasks(3); - Assert.assertEquals(3, topology.getNumberOfTasks()); + FlinkTopology.createTopology(builder); + } - topology.increaseNumberOfTasks(2); - Assert.assertEquals(5, topology.getNumberOfTasks()); + @Test + @Ignore --- End diff -- Please enable this test. I forgot to do this in my last commit which fixes this issue... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45892911 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. */ - @Override - publ
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45892746 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. --- End diff -- Let me get th
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45892009 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { --- End diff -- Why removing this test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45891849 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java --- @@ -18,9 +18,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45891225 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } + /** * Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input * types. -* -* @param flinkTuple +* @param flinkTuple * The Flink tuple to be converted. * @param schema -* The schema (ie, ordered field names) of the tuple. +* @param producerComponentId */ --- End diff -- formatting; incomplete. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45891117 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java --- @@ -33,7 +30,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45891069 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java --- @@ -18,7 +18,6 @@ package org.apache.flink.storm.wrappers; import backtype.storm.spout.ISpoutOutputCollector; - import org.apache.flink.api.java.tuple.Tuple0; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45891019 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java --- @@ -17,12 +17,12 @@ package org.apache.flink.storm.wrappers; -import java.util.HashMap; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890996 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890935 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.tuple.Fields; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; + +/** + * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming + * program. In contrast to {@link BoltWrapper}, this wrapper takes two input stream as input. + */ +public class BoltWrapperTwoInput extends BoltWrapper implements TwoInputStreamOperator { + + /** The schema (ie, ordered field names) of the second input stream. */ + private final Fields inputSchema2; + + private final String componentId2; + private final String streamId2; + + /** +* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be +* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types +* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} +* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will +* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. +* @param bolt +*The Storm {@link IRichBolt bolt} to be used. +* @param boltId +* @param componentId2 +* @param streamId1 +* @param inputSchema1 +*The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException +* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if +* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range +* */ --- End diff -- formatting (space and stars) incomplete JavaDoc; missing `@throws` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890812 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.tuple.Fields; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; + +/** + * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming + * program. In contrast to {@link BoltWrapper}, this wrapper takes two input stream as input. + */ +public class BoltWrapperTwoInput extends BoltWrapper implements TwoInputStreamOperator { + + /** The schema (ie, ordered field names) of the second input stream. */ + private final Fields inputSchema2; + + private final String componentId2; + private final String streamId2; + --- End diff -- missing JavaDoc for both members --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890399 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -108,20 +112,19 @@ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema) * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. -* -* @param bolt +* @param bolt *The Storm {@link IRichBolt bolt} to be used. +* @param inputStreamId +* @param inputComponentId * @param rawOutputs *Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be -*of a raw type. -* @throws IllegalArgumentException --- End diff -- keep `@throws` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890269 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -89,17 +94,16 @@ public BoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on * the bolt's declared number of attributes. -* -* @param bolt +* @param bolt *The Storm {@link IRichBolt bolt} to be used. +* @param inputStreamId +* @param inputComponentId * @param inputSchema -*The schema (ie, ordered field names) of the input stream. -* @throws IllegalArgumentException -* If the number of declared output attributes is not with range [0;25]. --- End diff -- Why do you delete `@throws` ? More documentation is always better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890289 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -108,20 +112,19 @@ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema) * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. -* -* @param bolt +* @param bolt --- End diff -- space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45890041 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -89,17 +94,16 @@ public BoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on * the bolt's declared number of attributes. -* -* @param bolt +* @param bolt *The Storm {@link IRichBolt bolt} to be used. --- End diff -- delete one space before `@param` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45889924 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -77,11 +80,13 @@ * * @param bolt *The Storm {@link IRichBolt bolt} to be used. +* @param inputStreamId +* @param inputComponentId --- End diff -- JavaDoc incomplete -- same below -- will not mark it again. Please complete everywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45889838 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java --- @@ -53,21 +51,26 @@ private static final long serialVersionUID = -4788589118464155835L; /** The wrapped Storm {@link IRichBolt bolt}. */ - private final IRichBolt bolt; + protected final IRichBolt bolt; /** The name of the bolt. */ private final String name; /** Number of attributes of the bolt's output tuples per stream. */ - private final HashMap numberOfAttributes; + protected final HashMap numberOfAttributes; /** The schema (ie, ordered field names) of the input stream. */ - private final Fields inputSchema; + protected final Fields inputSchema; /** The original Storm topology. */ protected StormTopology stormTopology; + protected transient TopologyContext topologyContext; + + protected final String inputComponentId; + protected final String inputStreamId; + --- End diff -- Please add JavaDoc to those three. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45889473 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java --- @@ -19,7 +19,6 @@ import backtype.storm.task.IOutputCollector; import backtype.storm.tuple.Tuple; - import org.apache.flink.api.java.tuple.Tuple0; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45889260 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- It shows how to run an existing Storm topology with Flink. It prints from Twitter which is kind of neat. It's also included in Storm. It's nice to have some other examples other than WordCount. This was actually not working before this PR... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45889142 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. */ - @Override - pu
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r4594 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. */ - @Override - pu
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45888777 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,474 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** -* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link -* FlinkClient}. * -* @throws UnsupportedOperationException -* at every invocation +* Creates a Flink program that uses the specified spouts and bolts. +* @param stormBuilder The storm topology builder to use for creating the Flink topology. +* @return A Flink Topology which may be executed. --- End diff -- typo: Strom
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45887969 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java --- @@ -20,11 +20,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45887487 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java --- @@ -99,6 +111,7 @@ public void rebalance(final String name, final RebalanceOptions options) { public void shutdown() { flink.stop(); + flink = null; --- End diff -- Should be kept. Otherwise, calling `submitTopologyWithOpts` a second time will run into a NPE. (Or add proper exception as `else` of `if (flink == null)` check, ie, "Cannot run topology. Cluster got shut down." or similar.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45886799 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java --- @@ -17,8 +17,6 @@ */ package org.apache.flink.storm.split; -import java.util.Map; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45886615 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java --- @@ -17,10 +17,9 @@ package org.apache.flink.storm.wordcount.operators; -import org.apache.flink.storm.util.FileSpout; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45886597 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java --- @@ -17,10 +17,10 @@ package org.apache.flink.storm.wordcount.operators; -import java.io.Serializable; - import org.apache.flink.examples.java.wordcount.util.WordCountData; --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45886536 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java --- @@ -19,7 +19,6 @@ import backtype.storm.topology.IRichSpout; import backtype.storm.utils.Utils; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45886387 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java --- @@ -57,16 +57,13 @@ public static void main(final String[] args) throws Exception { } // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); + final TopologyBuilder builder = WordCountTopology.buildTopology(false); --- End diff -- Please remove `false` -- this test should use index (and not name) to specify the key. `WordCountLocalByName` does it the other way round such that both cases are covered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45885615 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java --- @@ -17,9 +17,6 @@ */ package org.apache.flink.storm.split.operators; -import java.util.Map; -import java.util.Random; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45885626 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java --- @@ -17,8 +17,6 @@ */ package org.apache.flink.storm.split.operators; -import java.util.Map; - --- End diff -- pure reformatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45885529 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- What is the purpose of this example? Does it show anything special? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1398#issuecomment-159641971 I've rebased to the latest master and addressed your comments. I would like to merge this and programmatically fix the multiple inputs issue afterwards. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45854121 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java --- @@ -150,7 +153,7 @@ static synchronized TopologyContext createTopologyContext( } stormTopology = new StormTopology(spouts, bolts, new HashMap()); - taskId = context.getIndexOfThisSubtask(); + taskId = context.getIndexOfThisSubtask() + 1; --- End diff -- Actually, it doesn't matter. I set this before changing the topology parsing logic. For some topologies it would only run with this fix. But this has been fixed so the +1 is not necessary anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45758926 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- Don't understand me wrong. I don't want to discard your work! And I believe that you did not intent do get a "messy" PR. But that's the current state. I think we can refine and merge it. But it does not resolve FLINK-2837 even if it improves on it. I would also assume, that your union code will be reworked heavily later on... Not sure about your tuple meta information code. Need to have a look in detail. That is the reason why I had the idea to apply the discussed API changes only in a single PR. But if this is too complex, we should just carry on with this PR. Btw: even if the JIRA is quite old it is not assigned to you; thus you should have ask about it. You did the same with FLINK-2837 which was assigned to me, too -- I did not work in it yet so a assigned it to you (I thought as you did have the union code together with the API changes, that should be fine). Additionally, the reason I just assigned it to you was, that FLINK-2837 is actually a requirement for FLINK-2721. That is why I stopped working on it back than, but did not have time to fix FLINK-2837 either. I did not assume that you tackle the join-case which does require the tuple meta info... A regular union does not require it. Anyway. Just let us get this PR done. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45747566 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- I was actually not specifically trying to address JIRA issues but just fixed everything I discovered on the way while trying out the compatibility layer. Only after fixing I realized there are open JIRA issues. One is assigned to me (FLINK-2837] and the other one (FLINK-2721) is open since two months. I think it would be a shame not to merge this pull request soon. It provides a good foundation to address any further issues. Splitting this PR should not be trivial with all the changes. I already accommodated you with the API changes. Also, I would like to address most of your comments but I'm not too inclined to split up this PR (if it is even possible). Could you base your work on this pull request and do a follow-up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45744427 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- I see you point. But this PR might be too big anyway. You try to do 3 thing at the same time (two are backed up by a JIRA). How hard would it be to split this PR? Last but not least, the multi-input-stream JIRA is not resolved by this. [And the second JIRA you try to resolve is assigned to me, and I have already worked on it -- I actually would like to finish my work on it] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45743505 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- Well. Get it right from the beginning. I think it was all but right until now :) And in this regard, its much more defined now. At least you get an error if you have more than two inputs. I agree that we should fix this. But it's going to be a bit tricky because we have to hack around Flink's limitation. I would rather not do this in this pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45742093 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- Well. From a Storm point of view, there is only `union`. As it is the generic Storm case it includes the join case. I guess you specialized join solution would be obsolete after generic union is supported. Therefore, I would prefer to get it right from the beginning on... My idea would be to try to get rid of `TwoInputBoltWrapper` and "union" the incoming streams somehow to feed a single stream to `BoltWrapper`. The tricky part is, that we cannot use Flink's `union` because it assume the same input type, but Storm can union different types into one stream... What do you think about this idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45741402 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java --- @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.join; + +import backtype.storm.Config; +import backtype.storm.testing.FeederSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.TupleOutputFormatter; +import storm.starter.bolt.PrinterBolt; +import storm.starter.bolt.SingleJoinBolt; + + +public class SingleJoinExample { + + public static void main(String[] args) throws Exception { + final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); + final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); + --- End diff -- Okay should be doable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45741186 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -180,8 +178,6 @@ public void testCreateTopologyContext() { builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); --- End diff -- Agreed. My concern was to get the join working but union should also be supported for same data output types. Could we do that in a follow-up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45740805 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java --- @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.join; + +import backtype.storm.Config; +import backtype.storm.testing.FeederSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.TupleOutputFormatter; +import storm.starter.bolt.PrinterBolt; +import storm.starter.bolt.SingleJoinBolt; + + +public class SingleJoinExample { + + public static void main(String[] args) throws Exception { + final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); + final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); + --- End diff -- Yes. :) For example, 2 fields for one input and 3 fields for the other input. (The idea is to have different number for both inputs to make sure the input schemas of both can differ not only by their types and also by their number of attributes) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45740238 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java --- @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.join; + +import backtype.storm.Config; +import backtype.storm.testing.FeederSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.TupleOutputFormatter; +import storm.starter.bolt.PrinterBolt; +import storm.starter.bolt.SingleJoinBolt; + + +public class SingleJoinExample { + + public static void main(String[] args) throws Exception { + final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); + final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); + --- End diff -- You mean different number of fields? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45735762 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java --- @@ -18,20 +18,23 @@ package org.apache.flink.storm.util; import backtype.storm.task.TopologyContext; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.util.Map; /** - * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each + * Implements a sink that writes the received data to the given file (as a result of {@code Object.toString()} for each * attribute). */ public final class BoltFileSink extends AbstractBoltSink { private static final long serialVersionUID = 2014027288631273666L; - private final String path; + private final Path path; private BufferedWriter writer; --- End diff -- No. People will have spout/bolt code they do not want to touch when running it in Flink. Thus there code will be written in the same way -- and so should be example be. Otherwise, we deliver the impression they need to change there code -- but they don't. Thus, we implement the example Spout/Bolts in a pure Storm way. Of course, if somebody develops a new Spout/Bolt with Flink in mind, your approach makes sense. However, this is not the main focus (it would be even better if the code new stuff Flink native in embedded mode, instead of develop Spout/Bolts which are Flink tailored). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45734993 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java --- @@ -38,6 +38,8 @@ protected String path = null; protected BufferedReader reader; + protected boolean finished; + public FileSpout() {} --- End diff -- The point it the following. `FileSpout` is a Flink agnostic implementation that is improved in a Flink aware way by `FiniteFileSpout`. Thus, `FileSpout` should be implemented the Storm way without any knowledge of Flink. And `FiniteFileSpout` should use Flink stuff, but not change the code of `FileSpout`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45734441 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java --- @@ -18,20 +18,23 @@ package org.apache.flink.storm.util; import backtype.storm.task.TopologyContext; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.util.Map; /** - * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each + * Implements a sink that writes the received data to the given file (as a result of {@code Object.toString()} for each * attribute). */ public final class BoltFileSink extends AbstractBoltSink { private static final long serialVersionUID = 2014027288631273666L; - private final String path; + private final Path path; private BufferedWriter writer; --- End diff -- So people download Flink and learn about the Storm compatibility layer to write spouts/bolts which they use in Storm topologies that run without Flink? That is beyond my imagination :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---