[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-02 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1398#issuecomment-161412796
  
Thanks for you patience. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-02 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1398#issuecomment-161304907
  
Thanks for your feedback!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1398


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1398#issuecomment-160985940
  
If Travis is green please merge. You can fix the last tiny comments 
directly before merging. No need to update this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46284171
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -108,16 +114,13 @@ public BoltWrapper(final IRichBolt bolt, final Fields 
inputSchema)
 * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
 * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-* 
-* @param bolt
-*The Storm {@link IRichBolt bolt} to be used.
+* @param bolt The Storm {@link IRichBolt bolt} to be used.
 * @param rawOutputs
 *Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
 *of a raw type.
 * @throws IllegalArgumentException
 * If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
 * {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-* [1;25].
--- End diff --

Please keep and update: should be `[0;25]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46284474
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +47,32 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   /** The task id where this tuple is processed */
+   private final int taskId;
+   /** The producer of this tuple */
+   private final String producerStreamId;
+   /** The producer's component id of this tuple */
+   private final String producerComponentId;
+   /*+ The message that is associated with this tuple */
--- End diff --

`/**` not `+`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46284023
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -89,13 +99,9 @@ public BoltWrapper(final IRichBolt bolt) throws 
IllegalArgumentException {
 * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
 * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
 * the bolt's declared number of attributes.
-* 
-* @param bolt
-*The Storm {@link IRichBolt bolt} to be used.
+* @param bolt The Storm {@link IRichBolt bolt} to be used.
 * @param inputSchema
-*The schema (ie, ordered field names) of the input stream.
-* @throws IllegalArgumentException
--- End diff --

Please keep `@throws`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1398#issuecomment-160982088
  
I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46281582
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 ---
@@ -265,12 +264,12 @@ public void testOpen() throws Exception {
@Test
public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
-   BoltWrapper wrapper = new BoltWrapper(bolt);
+   BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component");

wrapper.setup(createMockStreamTask(), new StreamConfig(new 
Configuration()), mock(Output.class));
wrapper.open();

-   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
+   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), isNotNull(OutputCollector.class));
--- End diff --

I see. Makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46279370
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
 ---
@@ -72,7 +71,7 @@ public static void main(final String[] args) throws 
Exception {
.transform("StormBoltTokenizer",
TypeExtractor.getForObject(""),
new BoltWrapper(new ExclamationBolt(),
-   new String[] { 
Utils.DEFAULT_STREAM_ID }))
+   "stream", 
"component", new String[] { Utils.DEFAULT_STREAM_ID }))
--- End diff --

Can do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46278994
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -193,24 +189,22 @@ public void testCreateTopologyContext() {
Utils.sleep(++counter * 1);
cluster.shutdown();
 
-   if (TestSink.result.size() == 8) {
+   if (TestSink.result.size() >= 4) {
--- End diff --

The Storm executor sometimes returned more results for me. I've adjusted it 
to a fixed size again. I think the important thing here is that we check all 
the returned results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46277213
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 ---
@@ -265,12 +264,12 @@ public void testOpen() throws Exception {
@Test
public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
-   BoltWrapper wrapper = new BoltWrapper(bolt);
+   BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component");

wrapper.setup(createMockStreamTask(), new StreamConfig(new 
Configuration()), mock(Output.class));
wrapper.open();

-   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
+   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), isNotNull(OutputCollector.class));
--- End diff --

`any` matches also null (literally anything) but here I want to explicitly 
check `isNotNull`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-12-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46276898
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
 ---
@@ -224,7 +224,7 @@ static synchronized TopologyContext 
createTopologyContext(
 *OUTPUT: A map from all component IDs to there output 
streams and output fields.
 * 
 * @return A unique task ID if the currently processed Spout or Bolt 
({@code componentId}) is equal to the current
-* Flink operator ({@link operatorName}) -- {@code null} 
otherwise.
+* Flink operator ({@param operatorName}) -- {@code null} 
otherwise.
--- End diff --

Ok, let's use code.


http://stackoverflow.com/questions/1667212/reference-a-method-parameter-in-javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46035298
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
 ---
@@ -224,7 +224,7 @@ static synchronized TopologyContext 
createTopologyContext(
 *OUTPUT: A map from all component IDs to there output 
streams and output fields.
 * 
 * @return A unique task ID if the currently processed Spout or Bolt 
({@code componentId}) is equal to the current
-* Flink operator ({@link operatorName}) -- {@code null} 
otherwise.
+* Flink operator ({@param operatorName}) -- {@code null} 
otherwise.
--- End diff --

I guess `@link` is wrong, but does `@param` work? Maybe `@code` would be 
correct? But I am not sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46035118
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 ---
@@ -265,12 +264,12 @@ public void testOpen() throws Exception {
@Test
public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
-   BoltWrapper wrapper = new BoltWrapper(bolt);
+   BoltWrapper wrapper = new BoltWrapper(bolt, "stream", "component");

wrapper.setup(createMockStreamTask(), new StreamConfig(new 
Configuration()), mock(Output.class));
wrapper.open();

-   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
+   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), isNotNull(OutputCollector.class));
--- End diff --

Just out of curiosity: Why `isNotNull` instead of `any`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46034951
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -193,24 +189,22 @@ public void testCreateTopologyContext() {
Utils.sleep(++counter * 1);
cluster.shutdown();
 
-   if (TestSink.result.size() == 8) {
+   if (TestSink.result.size() >= 4) {
--- End diff --

Why `>=` and not `==` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46033750
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
 ---
@@ -64,7 +63,7 @@ public static void main(final String[] args) throws 
Exception {
// this is done by a bolt that is wrapped 
accordingly
.transform("BoltTokenizer",
TypeExtractor.getForObject(new 
Tuple2("", 0)),
-   new BoltWrapper>(new BoltTokenizer()))
+   new BoltWrapper>(new BoltTokenizer(), "stream", "component"))
--- End diff --

Same question here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46033770
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
 ---
@@ -71,7 +70,7 @@ public static void main(final String[] args) throws 
Exception {
// this is done by a bolt that is wrapped 
accordingly
.transform("BoltTokenizerPojo",
TypeExtractor.getForObject(new 
Tuple2("", 0)),
-   new BoltWrapper>(new BoltTokenizerByName()))
+   new BoltWrapper>(new BoltTokenizerByName(), "stream", "component"))
--- End diff --

Again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46033787
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
 ---
@@ -75,7 +74,7 @@ public static void main(final String[] args) throws 
Exception {
"BoltTokenizerWithNames",
TypeExtractor.getForObject(new 
Tuple2("", 0)),
new BoltWrapper, 
Tuple2>(
-   new 
BoltTokenizerByName(), new Fields("sentence")))
+   new 
BoltTokenizerByName(), "stream", "component", new Fields("sentence")))
--- End diff --

Again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46033705
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
 ---
@@ -70,7 +70,7 @@ public static void main(final String[] args) throws 
Exception {
oddStream.transform("oddBolt",
TypeExtractor.getForObject(new Tuple2("", 0)),
new BoltWrapper, 
Tuple2>(
-   new VerifyAndEnrichBolt(false)))
+   new VerifyAndEnrichBolt(false), 
"stream", "component"))
--- End diff --

Default values for both? Or use `ODD_STREAM` for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-27 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r46033582
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
 ---
@@ -72,7 +71,7 @@ public static void main(final String[] args) throws 
Exception {
.transform("StormBoltTokenizer",
TypeExtractor.getForObject(""),
new BoltWrapper(new ExclamationBolt(),
-   new String[] { 
Utils.DEFAULT_STREAM_ID }))
+   "stream", 
"component", new String[] { Utils.DEFAULT_STREAM_ID }))
--- End diff --

Can't we use default values for streamID and componentID here, ie, obit 
both parameters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971856
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if 
the bolt didn't define any output fields. That led to a NullPointerException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971476
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Fair enough, I use default id constants now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45970656
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Ok. I guess it would make sense to use Storm's `Utils.DEFAULT_STREAM_ID` 
here? And maybe add a `public final static String DEFAULT_OPERATOR_ID` variable 
to `StormTuple`? What about using "defaultID" or "unspecified" instead of 
"componentID" or similar? Just to make it clear if the name shows up in the UI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45970389
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

Good we are on the same page. And I don't want to bully you! I just 
mentioned the classes that do not contain any actual code change -- actually, 
according to the coding guidelines -- there should be no import-order changes 
even in the classes with code changes -- I did not comment on them -- just on 
the classes with pure reformatting. I like consistency so please apply the 
changes to all classes. But I did import-reorderings or making code formatting 
consistent (if it was inconsistent) and was always told "don't do this". So if 
it is a general rule, I just point it out here, too. I did not come up with the 
rule. And I never force my own code style -- a always adapt to the given style. 
:) It's is really about time to get a proper maven formatting tool running to 
get rid of all this stupid discussions. (And a said already: "It is not against 
you or the change itself" -- but the process seems to be inconsistent -- people 
follow the rules more or less strictly)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45967336
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

Not sure who is bullying whom :) Look at the classes and you will see that 
all imports are arranged like this. We want to be consistent, right? According 
to your suggestion, I changed the other import statements which were just 
reformatting.

Open source is often about compromises. Very rarely you will find that the 
code style of a person reflects exactly how you would do it. I'm making 
compromises and changing things as you like them. That's fine for me. Please 
don't give me a harder time by blaming my employer. I'm not aware I have done 
something like this to you. Next time you get blamed for something like this, 
please contact me and I'll try to help you. I don't think this is the right 
place to sort out things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45966621
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 ---
@@ -20,11 +20,9 @@
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965523
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

I see. DataArtians committer can do any change, but external committers get 
bullied if they apply similar changes... It is not against you or the change 
itself -- it unifies the style which does make sense. But I got bullied 
multiple times in other PRs when I did similar stuff...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965316
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

I cannot see why it did not work before? Can you explain what the problem 
was?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964853
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import java.util.Map;
+
 public class TestDummyBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964876
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
 ---
@@ -16,16 +16,16 @@
  */
 package org.apache.flink.storm.util;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class TestSink implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964832
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
 ---
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
+import java.util.Map;
+
 public class TestSpout implements IRichSpout {
private static final long serialVersionUID = -4884029383198924007L;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964866
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
+import java.util.Map;
+
 public class TestDummySpout implements IRichSpout {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964904
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
 ---
@@ -21,7 +21,6 @@
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
-
 import org.apache.flink.api.common.ExecutionConfig;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964776
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
 ---
@@ -16,14 +16,14 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.Map;
+
 public class TestBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964692
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
+   public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+   TopologyBuilder builder = new TopologyBuilder();
 
-   Assert.assertEquals(0, topology.getNumberOfTasks());
+   builder.setSpout("spout", new TestDummySpout());
+   builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+   TestDummySpout.spoutStreamId, new Fields("id"));
 
-   topology.increaseNumberOfTasks(3);
-   Assert.assertEquals(3, topology.getNumberOfTasks());
+   FlinkTopology.createTopology(builder);
+   }
 
-   topology.increaseNumberOfTasks(2);
-   Assert.assertEquals(5, topology.getNumberOfTasks());
+   @Test
+   @Ignore
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964610
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964241
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 ---
@@ -18,9 +18,7 @@
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964023
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
 ---
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.wrappers;
 
-import java.util.HashMap;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963943
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 ---
@@ -33,7 +30,8 @@
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
 
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963933
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

The use of null is often problematic. I prefer default values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963921
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45896492
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
--- End diff --

Sorry for b

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45895556
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
 */
-   @Override
-   pu

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45894573
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
 ---
@@ -613,29 +611,29 @@ public void testGetBinaryByFieldPojoGetter() throws 
Exception {
return new StormTuple(tuple, schema);
}
 
-   @Test(expected = UnsupportedOperationException.class)
+   @Test
public void testGetSourceGlobalStreamid() {
-   new StormTuple(null, null).getSourceGlobalStreamid();
+   Assert.assertNotNull(new StormTuple(null, 
null).getSourceGlobalStreamid());
--- End diff --

Can we improve on all this tests? Just a check for "not-null" seems a  
little limited to me. We should rather check for defaults values and also check 
the full constructor case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45894174
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Are this meaningful/helpful defaults? Why not just set it to `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893951
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
 ---
@@ -21,7 +21,6 @@
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
-
 import org.apache.flink.api.common.ExecutionConfig;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893875
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
 ---
@@ -16,16 +16,16 @@
  */
 package org.apache.flink.storm.util;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class TestSink implements IRichBolt {
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893806
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
 ---
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
+import java.util.Map;
+
 public class TestSpout implements IRichSpout {
private static final long serialVersionUID = -4884029383198924007L;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893834
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import java.util.Map;
+
 public class TestDummyBolt implements IRichBolt {
--- End diff --

pure reformatiing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893853
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
+import java.util.Map;
+
 public class TestDummySpout implements IRichSpout {
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893736
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
--- End diff --

Please enable this test. I forgot to do this in my last commit which fixes 
this issue...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893790
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
 ---
@@ -16,14 +16,14 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.Map;
+
 public class TestBolt implements IRichBolt {
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45893753
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
+   public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+   TopologyBuilder builder = new TopologyBuilder();
 
-   Assert.assertEquals(0, topology.getNumberOfTasks());
+   builder.setSpout("spout", new TestDummySpout());
+   builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+   TestDummySpout.spoutStreamId, new Fields("id"));
 
-   topology.increaseNumberOfTasks(3);
-   Assert.assertEquals(3, topology.getNumberOfTasks());
+   FlinkTopology.createTopology(builder);
+   }
 
-   topology.increaseNumberOfTasks(2);
-   Assert.assertEquals(5, topology.getNumberOfTasks());
+   @Test
+   @Ignore
--- End diff --

Please enable this test. I forgot to do this in my last commit which fixes 
this issue...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45892911
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
 */
-   @Override
-   publ

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45892746
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
--- End diff --

Let me get th

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45892009
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
--- End diff --

Why removing this test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45891849
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 ---
@@ -18,9 +18,7 @@
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45891225
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
+
/**
 * Create a new Storm tuple from the given Flink tuple. The provided 
{@code nameIndexMap} is ignored for raw input
 * types.
-* 
-* @param flinkTuple
+*  @param flinkTuple
 *  The Flink tuple to be converted.
 * @param schema
-*  The schema (ie, ordered field names) of the tuple.
+* @param producerComponentId
 */
--- End diff --

formatting; incomplete.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45891117
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 ---
@@ -33,7 +30,8 @@
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
 
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45891069
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
 ---
@@ -18,7 +18,6 @@
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.spout.ISpoutOutputCollector;
-
 import org.apache.flink.api.java.tuple.Tuple0;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45891019
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
 ---
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.wrappers;
 
-import java.util.HashMap;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890996
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890935
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to 
execute the Storm bolt within a Flink Streaming
+ * program. In contrast to {@link BoltWrapper}, this wrapper takes two 
input stream as input.
+ */
+public class BoltWrapperTwoInput extends BoltWrapper implements TwoInputStreamOperator {
+
+   /** The schema (ie, ordered field names) of the second input stream. */
+   private final Fields inputSchema2;
+
+   private final String componentId2;
+   private final String streamId2;
+
+   /**
+* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
+* used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+* {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
+* and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
+* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+*  @param bolt
+*The Storm {@link IRichBolt bolt} to be used.
+* @param boltId
+* @param componentId2
+* @param streamId1
+* @param inputSchema1
+*The schema (ie, ordered field names) of the input stream.
@throws IllegalArgumentException
+* If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+* {@code rawOuput} is {@code false} and the number of declared 
output attributes is not with range
+* */
--- End diff --

formatting (space and stars) incomplete JavaDoc; missing `@throws`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890812
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to 
execute the Storm bolt within a Flink Streaming
+ * program. In contrast to {@link BoltWrapper}, this wrapper takes two 
input stream as input.
+ */
+public class BoltWrapperTwoInput extends BoltWrapper implements TwoInputStreamOperator {
+
+   /** The schema (ie, ordered field names) of the second input stream. */
+   private final Fields inputSchema2;
+
+   private final String componentId2;
+   private final String streamId2;
+
--- End diff --

missing JavaDoc for both members


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890399
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -108,20 +112,19 @@ public BoltWrapper(final IRichBolt bolt, final Fields 
inputSchema)
 * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
 * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-* 
-* @param bolt
+*  @param bolt
 *The Storm {@link IRichBolt bolt} to be used.
+* @param inputStreamId
+* @param inputComponentId
 * @param rawOutputs
 *Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-*of a raw type.
-* @throws IllegalArgumentException
--- End diff --

keep `@throws`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890269
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -89,17 +94,16 @@ public BoltWrapper(final IRichBolt bolt) throws 
IllegalArgumentException {
 * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
 * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
 * the bolt's declared number of attributes.
-* 
-* @param bolt
+*  @param bolt
 *The Storm {@link IRichBolt bolt} to be used.
+* @param inputStreamId
+* @param inputComponentId
 * @param inputSchema
-*The schema (ie, ordered field names) of the input stream.
-* @throws IllegalArgumentException
-* If the number of declared output attributes is not with 
range [0;25].
--- End diff --

Why do you delete `@throws` ? More documentation is always better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890289
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -108,20 +112,19 @@ public BoltWrapper(final IRichBolt bolt, final Fields 
inputSchema)
 * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
 * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-* 
-* @param bolt
+*  @param bolt
--- End diff --

space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45890041
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -89,17 +94,16 @@ public BoltWrapper(final IRichBolt bolt) throws 
IllegalArgumentException {
 * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
 * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
 * the bolt's declared number of attributes.
-* 
-* @param bolt
+*  @param bolt
 *The Storm {@link IRichBolt bolt} to be used.
--- End diff --

delete one space before `@param`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45889924
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -77,11 +80,13 @@
 * 
 * @param bolt
 *The Storm {@link IRichBolt bolt} to be used.
+* @param inputStreamId
+* @param inputComponentId
--- End diff --

JavaDoc incomplete -- same below -- will not mark it again. Please complete 
everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45889838
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 ---
@@ -53,21 +51,26 @@
private static final long serialVersionUID = -4788589118464155835L;
 
/** The wrapped Storm {@link IRichBolt bolt}. */
-   private final IRichBolt bolt;
+   protected final IRichBolt bolt;
/** The name of the bolt. */
private final String name;
/** Number of attributes of the bolt's output tuples per stream. */
-   private final HashMap numberOfAttributes;
+   protected final HashMap numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
-   private final Fields inputSchema;
+   protected final Fields inputSchema;
/** The original Storm topology. */
protected StormTopology stormTopology;
 
+   protected transient TopologyContext topologyContext;
+
+   protected final String inputComponentId;
+   protected final String inputStreamId;
+
--- End diff --

Please add JavaDoc to those three.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45889473
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
 ---
@@ -19,7 +19,6 @@
 
 import backtype.storm.task.IOutputCollector;
 import backtype.storm.tuple.Tuple;
-
 import org.apache.flink.api.java.tuple.Tuple0;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45889260
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

It shows how to run an existing Storm topology with Flink. It prints from 
Twitter which is kind of neat. It's also included in Storm. It's nice to have 
some other examples other than WordCount. This was actually not working before 
this PR...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45889142
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
 */
-   @Override
-   pu

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r4594
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
 */
-   @Override
-   pu

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45888777
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
@@ -15,75 +16,474 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+   /** All declared streams and output schemas by operator ID */
+   private final HashMap> outputStreams = 
new HashMap>();
+   /** All spouts&bolts declarers by their ID */
+   private final HashMap declarers = 
new HashMap();
+
+   private final HashMap>> 
unprocessdInputsPerBolt =
+   new HashMap>>();
+
+   final HashMap>> 
availableInputs = new HashMap<>();
 
-   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-   private int numberOfTasks = 0;
+   private final TopologyBuilder builder;
 
-   public FlinkTopology() {
-   // Set default parallelism to 1, to mirror Storm default 
behavior
-   super.setParallelism(1);
+   // needs to be a class member for internal testing purpose
+   private final StormTopology stormTopology;
+
+   private final Map spouts;
+   private final Map bolts;
+
+   private final StreamExecutionEnvironment env;
+
+   private FlinkTopology(TopologyBuilder builder) {
+   this.builder = builder;
+   this.stormTopology = builder.createTopology();
+   // extract the spouts and bolts
+   this.spouts = getPrivateField("_spouts");
+   this.bolts = getPrivateField("_bolts");
+
+   this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Kick off the translation immediately
+   translateTopology();
}
 
/**
-* Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-* FlinkClient}.
 *
-* @throws UnsupportedOperationException
-*  at every invocation
+* Creates a Flink program that uses the specified spouts and bolts.
+* @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
+* @return A Flink Topology which may be executed.
--- End diff --

typo: Strom

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45887969
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 ---
@@ -20,11 +20,9 @@
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45887487
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
 ---
@@ -99,6 +111,7 @@ public void rebalance(final String name, final 
RebalanceOptions options) {
 
public void shutdown() {
flink.stop();
+   flink = null;
--- End diff --

Should be kept. Otherwise, calling `submitTopologyWithOpts` a second time 
will run into a NPE. (Or add proper exception as `else` of `if (flink == null)` 
check, ie, "Cannot run topology. Cluster got shut down." or similar.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45886799
  
--- Diff: 
flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
 ---
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.storm.split;
 
-import java.util.Map;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45886615
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
 ---
@@ -17,10 +17,9 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import org.apache.flink.storm.util.FileSpout;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45886597
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
 ---
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import java.io.Serializable;
-
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45886536
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
 ---
@@ -19,7 +19,6 @@
 
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.utils.Utils;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45886387
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
 ---
@@ -57,16 +57,13 @@ public static void main(final String[] args) throws 
Exception {
}
 
// build Topology the Storm way
-   final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+   final TopologyBuilder builder = 
WordCountTopology.buildTopology(false);
 
--- End diff --

Please remove `false` -- this test should use index (and not name) to 
specify the key. `WordCountLocalByName` does it the other way round such that 
both cases are covered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45885615
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
 ---
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.storm.split.operators;
 
-import java.util.Map;
-import java.util.Random;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45885626
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
 ---
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.storm.split.operators;
 
-import java.util.Map;
-
--- End diff --

pure reformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45885529
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

What is the purpose of this example? Does it show anything special?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1398#issuecomment-159641971
  
I've rebased to the latest master and addressed your comments. I would like 
to merge this and programmatically fix the multiple inputs issue afterwards.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45854121
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
 ---
@@ -150,7 +153,7 @@ static synchronized TopologyContext 
createTopologyContext(
}
stormTopology = new StormTopology(spouts, bolts, new 
HashMap());
 
-   taskId = context.getIndexOfThisSubtask();
+   taskId = context.getIndexOfThisSubtask() + 1;
 
--- End diff --

Actually, it doesn't matter. I set this before changing the topology 
parsing logic. For some topologies it would only run with this fix. But this 
has been fixed so the +1 is not necessary anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45758926
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

Don't understand me wrong. I don't want to discard your work! And I believe 
that you did not intent do get a "messy" PR. But that's the current state.

I think we can refine and merge it. But it does not resolve FLINK-2837 even 
if it improves on it. I would also assume, that your union code will be 
reworked heavily later on... Not sure about your tuple meta information code. 
Need to have a look in detail. That is the reason why I had the idea to apply 
the discussed API changes only in a single PR. But if this is too complex, we 
should just carry on with this PR.

Btw: even if the JIRA is quite old it is not assigned to you; thus you 
should have ask about it. You did the same with FLINK-2837 which was assigned 
to me, too -- I did not work in it yet so a assigned it to you (I thought as 
you did have the union code together with the API changes, that should be fine).

Additionally, the reason I just assigned it to you was, that FLINK-2837 is 
actually a requirement for FLINK-2721. That is why I stopped working on it back 
than, but did not have time to fix FLINK-2837 either. I did not assume that you 
tackle the join-case which does require the tuple meta info... A regular union 
does not require it.

Anyway. Just let us get this PR done. :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45747566
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

I was actually not specifically trying to address JIRA issues but just 
fixed everything I discovered on the way while trying out the compatibility 
layer. Only after fixing I realized there are open JIRA issues. One is assigned 
to me (FLINK-2837] and the other one (FLINK-2721) is open since two months. I 
think it would be a shame not to merge this pull request soon. It provides a 
good foundation to address any further issues. Splitting this PR should not be 
trivial with all the changes.

I already accommodated you with the API changes. Also, I would like to 
address most of your comments but I'm not too inclined to split up this PR (if 
it is even possible). Could you base your work on this pull request and do a 
follow-up? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45744427
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

I see you point. But this PR might be too big anyway. You try to do 3 thing 
at the same time (two are backed up by a JIRA). How hard would it be to split 
this PR? Last but not least, the multi-input-stream JIRA is not resolved by 
this. [And the second JIRA you try to resolve is assigned to me, and I have 
already worked on it -- I actually would like to finish my work on it]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45743505
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

Well. Get it right from the beginning. I think it was all but right until 
now :) And in this regard, its much more defined now. At least you get an error 
if you have more than two inputs.

I agree that we should fix this. But it's going to be a bit tricky because 
we have to hack around Flink's limitation. I would rather not do this in this 
pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45742093
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

Well. From a Storm point of view, there is only `union`. As it is the 
generic Storm case it includes the join case. I guess you specialized join 
solution would be obsolete after generic union is supported. Therefore, I would 
prefer to get it right from the beginning on... My idea would be to try to get 
rid of `TwoInputBoltWrapper` and "union" the incoming streams somehow to feed a 
single stream to `BoltWrapper`. The tricky part is, that we cannot use Flink's 
`union` because it assume the same input type, but Storm can union different 
types into one stream... What do you think about this idea? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45741402
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
 ---
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.join;
+
+import backtype.storm.Config;
+import backtype.storm.testing.FeederSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SingleJoinBolt;
+
+
+public class SingleJoinExample {
+
+   public static void main(String[] args) throws Exception {
+   final FeederSpout genderSpout = new FeederSpout(new 
Fields("id", "gender"));
+   final FeederSpout ageSpout = new FeederSpout(new Fields("id", 
"age"));
+
--- End diff --

Okay should be doable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45741186
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -180,8 +178,6 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
.shuffleGrouping("bolt1", 
TestDummyBolt.groupingStreamId)
-   .shuffleGrouping("bolt1", 
TestDummyBolt.shuffleStreamId)
-   .shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
--- End diff --

Agreed. My concern was to get the join working but union should also be 
supported for same data output types. Could we do that in a follow-up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45740805
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
 ---
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.join;
+
+import backtype.storm.Config;
+import backtype.storm.testing.FeederSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SingleJoinBolt;
+
+
+public class SingleJoinExample {
+
+   public static void main(String[] args) throws Exception {
+   final FeederSpout genderSpout = new FeederSpout(new 
Fields("id", "gender"));
+   final FeederSpout ageSpout = new FeederSpout(new Fields("id", 
"age"));
+
--- End diff --

Yes. :) For example, 2 fields for one input and 3 fields for the other 
input. (The idea is to have different number for both inputs to make sure the 
input schemas of both can differ not only by their types and also by their 
number of attributes)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45740238
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
 ---
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.join;
+
+import backtype.storm.Config;
+import backtype.storm.testing.FeederSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SingleJoinBolt;
+
+
+public class SingleJoinExample {
+
+   public static void main(String[] args) throws Exception {
+   final FeederSpout genderSpout = new FeederSpout(new 
Fields("id", "gender"));
+   final FeederSpout ageSpout = new FeederSpout(new Fields("id", 
"age"));
+
--- End diff --

You mean different number of fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45735762
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
 ---
@@ -18,20 +18,23 @@
 package org.apache.flink.storm.util;
 
 import backtype.storm.task.TopologyContext;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 
 import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.Map;
 
 /**
- * Implements a sink that write the received data to the given file (as a 
result of {@code Object.toString()} for each
+ * Implements a sink that writes the received data to the given file (as a 
result of {@code Object.toString()} for each
  * attribute).
  */
 public final class BoltFileSink extends AbstractBoltSink {
private static final long serialVersionUID = 2014027288631273666L;
 
-   private final String path;
+   private final Path path;
private BufferedWriter writer;
--- End diff --

No. People will have spout/bolt code they do not want to touch when running 
it in Flink. Thus there code will be written in the same way -- and so should 
be example be. Otherwise, we deliver the impression they need to change there 
code -- but they don't. Thus, we implement the example Spout/Bolts in a pure 
Storm way. Of course, if somebody develops a new Spout/Bolt with Flink in mind, 
your approach makes sense. However, this is not the main focus (it would be 
even better if the code new stuff Flink native in embedded mode, instead of 
develop Spout/Bolts which are Flink tailored).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45734993
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
 ---
@@ -38,6 +38,8 @@
protected String path = null;
protected BufferedReader reader;
 
+   protected boolean finished;
+
public FileSpout() {}
--- End diff --

The point it the following. `FileSpout` is a Flink agnostic implementation 
that is improved in a Flink aware way by `FiniteFileSpout`. Thus, `FileSpout` 
should be implemented the Storm way without any knowledge of Flink. And 
`FiniteFileSpout` should use Flink stuff, but not change the code of 
`FileSpout`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45734441
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
 ---
@@ -18,20 +18,23 @@
 package org.apache.flink.storm.util;
 
 import backtype.storm.task.TopologyContext;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 
 import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.Map;
 
 /**
- * Implements a sink that write the received data to the given file (as a 
result of {@code Object.toString()} for each
+ * Implements a sink that writes the received data to the given file (as a 
result of {@code Object.toString()} for each
  * attribute).
  */
 public final class BoltFileSink extends AbstractBoltSink {
private static final long serialVersionUID = 2014027288631273666L;
 
-   private final String path;
+   private final Path path;
private BufferedWriter writer;
--- End diff --

So people download Flink and learn about the Storm compatibility layer to 
write spouts/bolts which they use in Storm topologies that run without Flink? 
That is beyond my imagination :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >