[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38906129
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
--- End diff --

The ctx is used to get the received msg from socket server.
So I override the toString() method in ctx.
I think it will be invalid if we use mockito to check the received msg is 
correct.
Or is it unnecessary to check the msg since this test is for the retry 
times?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38909392
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
--- End diff --

You're right, you need to capture the input to the collect method. How 
about using an `ArgumentCaptor`? 
http://mockito.googlecode.com/svn/tags/1.8.0/javadoc/org/mockito/ArgumentCaptor.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-138518846
  
@mxm
Hi, I added the ArgumentCaptor to the test and removed the unwanted code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-138608134
  
Nice :) Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/992


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-07 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-138268476
  
@mxm 
@StephanEwen 
Hi, very sorry for bothering.
I got the CI passed.
Is there any new comment or this can be merged?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-04 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-137745160
  
@mxm
Ok, I make the CI rerun.
Any new comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-137372443
  
I think there is an issue with Travis at the moment. Could you force push 
to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136910004
  
@mxm @StephanEwen 
Hi, I just update one commit yesterday.
And I found that a few PRs got the same trouble yesterday.
Is there any issue in CI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38409485
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Initialization to `0` is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38414236
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Hi,
I removed this in my new commit.
Otherwise why doesn`t the CI run?
This happen in all of my new commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136754340
  
@HuangWHWHW Thank you for addressing my comments. Could you please squash 
your commits and force push to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136739055
  
I am not sure why the CI is not retesting this.

Can you try to squash your commits into one commit and force-push this 
branch? This always triggers CI for me...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-31 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136285250
  
@mxm
Hi, max.
Any comment about my new changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-27 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-135429969
  
@mxm 
Hi,
I add a test that first send a message to the SocketTextStreamFunction and 
this is success.
Then I close the server let the SocketTextStreamFunction retry.
Is it you need?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-135351424
  
While merging your pull request I noticed that the 
`SocketTextStreamFunction` actually does not wait the time specified in 
`CONNCTION_RETRY_SLEEP` but immediately tries to reconnect in case of an EOF. 
It only waits in case of a `ConnectionError`. I'm not sure whether this 
behavior is desired but this should also be reflected in your test cases. Could 
you add a test case where you first pass a Socket with an EOF and then let an 
`ConnectionError` occur?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37963338
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) throws 
Exception {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+
+   Field field = 
SocketTextStreamFunction.class.getDeclaredField(CONNECTION_RETRY_SLEEP);
+   field.setAccessible(true);
+   Field modifiersField = 
Field.class.getDeclaredField(modifiers);
+   modifiersField.setAccessible(true);
+   modifiersField.setInt(field, field.getModifiers()  
~Modifier.FINAL);
+   field.set(null, 200);
--- End diff --

...
Sorry for that.
I changed this in my new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37959211
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   public static int CONNECTION_RETRY_SLEEP = 1000;
--- End diff --

Ok no problem. Then make the variable non-final but don't expose it. So 
just have it `static int CONNECTION_RETRY_SLEEP = 1000`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134902558
  
@HuangWHWHW Thanks for your changes. Adding reflection calls to the testing 
codes is not good practice and makes the code hard to maintain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37959366
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) throws 
Exception {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+
+   Field field = 
SocketTextStreamFunction.class.getDeclaredField(CONNECTION_RETRY_SLEEP);
+   field.setAccessible(true);
+   Field modifiersField = 
Field.class.getDeclaredField(modifiers);
+   modifiersField.setAccessible(true);
+   modifiersField.setInt(field, field.getModifiers()  
~Modifier.FINAL);
+   field.set(null, 200);
--- End diff --

That's quite a hack. I think it is ok to remove the `final` modifier and 
make the field variable package-local: `static int CONNECTION_RETRY_SLEEP = 
1000`. Then you can set it directly here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-135262313
  
Hi,@mxm
any new comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37879401
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   public static int CONNECTION_RETRY_SLEEP = 1000;
--- End diff --

This shouldn't be modifiable by everyone. Please make it just 
package-visible by removing the `public` modifier. Also, please keep the 
`final` modifier because the current implementation just lets the number of 
retries be configurable with a fixed 1 second retry rate. This is also 
documented in the user-facing API methods on DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134797100
  
@mxm 
Hi,I chage the CONNECTION_RETRY_SLEEP to static final int 
CONNECTION_RETRY_SLEEP = 1000;
But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my 
test using:
SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200.
So, I add a reflection mechanism to resolve this.
And now the CONNECTION_RETRY_SLEEP changes to 200 in my test.
Would you please to take a look whether it is correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37843364
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37845490
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134526309
  
Hi, I take a new change that decrease the sleep time and clear the error 
for each test cases.
But I have no idea to control the value of retry to increase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37844142
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   channel = 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37856724
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37937505
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   public static int CONNECTION_RETRY_SLEEP = 1000;
--- End diff --

But if I add the final, it will be a error in my test:
Cannot assign a value to final variable CONNECTION_RETRY_SLEEP.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134493928
  
@mxm 
It doesn`t matter.
I`ll take a new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37861790
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134570319
  
Hi, I decreased both the waiting time and the retry times since it will 
still cost over 10 seconds if only the waiting time is decreased due to the 
Thread.sleep(CONNECTION_RETRY_SLEEP);.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37860526
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37860912
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134567337
  
Looks good to merge if we further adjust the waiting time of the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37858063
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37859069
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37860800
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37864215
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748868
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
--- End diff --

Did you have to add the sleep here to make the test pass? 10 seconds is 
quite long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37749287
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   channel = 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134199343
  
@HuangWHWHW Sorry for keeping you waiting. I've made some more comments. 
Otherwise, I think this looks ready to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37749598
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   channel = 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748844
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   channel = 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Socket channel;
+   channel = 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37762121
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
--- End diff --

You should clear the error for subsequent test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-20 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-132982117
  
Hi Max,
I am very sorry to bothered you.
I fixed some of my PRs and was waiting for your reply for days.
Otherwise, as your advice, I added a change about the sink retry and take 
tests for it.
I would be very honored if you can spend a little time to take a look about 
these PRs.
Thank you!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-20 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-133238272
  
@tillrohrmann 
Thank you for the info :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-20 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-132999197
  
@HuangWHWHW, Max is currently on vacations. He'll be back next week. I'm 
sure that he'll get back to you then :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-14 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130999638
  
Hi , I did a new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955259
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContextString ctx) throws 
Exception {
 
public void streamFromSocket(SourceContextString ctx, Socket socket) 
throws Exception {
try {
-   StringBuffer buffer = new StringBuffer();
+   StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new 
InputStreamReader(
socket.getInputStream()));
 
while (isRunning) {
-   int data;
+   String data;
try {
-   data = reader.read();
+   data = reader.readLine();
--- End diff --

Please use `read()` because of the custom delimiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955189
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -40,10 +37,12 @@
private char delimiter;
private long maxRetry;
private boolean retryForever;
+   private boolean isRetrying = false;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
 
+   private volatile boolean isExit = false;
--- End diff --

Is this flag necessary? We have `isRunning` already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955316
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -85,11 +84,12 @@ public void streamFromSocket(SourceContextString ctx, 
Socket socket) throws Ex
}
}
 
-   if (data == -1) {
+   if (data == null) {
socket.close();
long retry = 0;
boolean success = false;
-   while (retry  maxRetry  !success) {
+   while ((retry  maxRetry || 
(retryForever  !isExit))  !success) {
+   isRetrying = true;
--- End diff --

This flag is only necessary for your test and thus should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955334
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -145,4 +152,8 @@ public void cancel() {
}
}
}
+
+   public boolean getIsRetrying() {
--- End diff --

Please remove this getter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955221
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContextString ctx) throws 
Exception {
 
public void streamFromSocket(SourceContextString ctx, Socket socket) 
throws Exception {
--- End diff --

I think this method should be private because it is not meant to be used 
outside this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130644950
  
`StringBuilder` is only for single-threaded while `StringBuffer` enables 
multi-thread access. If you use `StringBuffer` in a single-threaded scenario it 
has worse performance than `StringBuilder`.

Thanks for you changes. In addition to the infinity test, can you add a 
test that checks for a certain number of retries (e.g. 10)? Also please add a 
check for 1 and 0 retries. It's always good to test corner cases :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130588652
  
Thanks for your changes. I think we should use `read()` instead of 
`readLine()` because we are using a custom delimiter and not necessarily \n 
(newline symbol). The danger of reading an entire line is that the newline 
symbol might never arrive. So it might continue to read forever. And even if it 
manages to find a newline symbol, you have to truncate your input to find the 
custom delimiter. That's not very efficient. Can you change the code back to 
using the `read()` method? I think we had a misunderstanding.

For you test case: It's not considered good practice to mix production and 
test code. You're doing that by introducing the `isRetrying` flag and exposing 
it. Alternatively, you have two options:

1. Create a `ServerSocket` and pass its address to the 
`SocketTextStreamFunction`. Then control the connection to this socket and 
count how often the function reconnects (e.g. use the `accept()` method).
2. Create your test in the same package as the `SocketTextStreamFunction` 
function (package is `org.apache.flink.streaming.api.functions.source`). Then 
you can access all field variables which are protected. So make your `retries` 
variable a protected field variable of the `SocketTextStreamFunction` class.

I hope that this helps you. If not, feel free to ask more questions.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130524804
  
@mxm 
Hi, I fixed the StringBuffer and add the test.
Take a look whether it`s correct.
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130626843
  
Hi Max,
I fixed all as your reviews.
And I retained the change of StringBuffer to StringBuilder.
There is a question that as I see the StringBuilder just do the same thing 
as StringBuffer currently.
So what`s the real different the two type in the SocketTextStreamFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130664351
  
@mxm 
Otherwise, I found the SocketClientSink didn`t have the retry.
Is it necessary to get a retry?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973279
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+   source.start();
+
+   Socket channel;
+   channel = serverSo.accept();
+   channel.close();
+   

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973254
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+
+   SourceFunction.SourceContextString ctx = new 
SourceFunction.SourceContextString() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count  100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+   source.start();
+
+   Socket channel;
+   channel = serverSo.accept();
+   channel.close();
+   

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973430
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

You should also initialize the variable with 0 in the `open()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973377
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

Sorry, minor thing but it should be `retries`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37042816
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --


Sorry for my English. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130663511
  
Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same 
as 10.
And would you please take a look with another two 
tests(https://github.com/apache/flink/pull/991   and  
https://github.com/apache/flink/pull/977)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130672880
  
 Otherwise, I found the SocketClientSink didn`t have the retry.
Is it necessary to get a retry?

Yes, that might be an issue but let's keep it separate from our concern 
here. If you want, you can open a JIRA issue for the missing retry option in 
the `SocketClientSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130217420
  
@mxm 
Hi, thank you for suggestions.
I will try to follow your suggestions and improve the test.
I understand almost of yours and I also read the Class documentation of 
BufferedReader.read().
When I was doing the test I found the BufferedReader.read() would never 
stop until it read next char from socket server or throw a Exception when 
socket is closed.
Returning -1 in BufferedReader.read() seems to be only worked in text file 
instead socket message.
And I looked for help in the net that some guys said you might add a 
method(Socket.setSoTimeout()) so the BufferedReader.read() would stop.
But this way is not satisfied neither since it would throw a exception.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130213092
  
@HuangWHWHW `read()` method of the `BufferedReader` object returns `-1` in 
case the end of the stream has been reached.

A couple of things I noticed apart from the `retryForever` issue. I wonder 
if we can fix these with this pull request as well:

1. The control flow of the `streamFromSocket` function is hard to predict 
because there are many `while` loops with `break`, `continue`, or `throw` 
statements.
2. We could use `StringBuilder` instead of `StringBuffer` in this class. 
`StringBuilder` is faster in the case of single-threaded access.
3. The function reads a single character at a time from the socket. It is 
more efficient to use a buffer and read several characters at once.

@HuangWHWHW You asked how you could count the number of retries in a unit 
test. Typically, you would insert a `Mock` or a `Spy` into your test method. 
Unfortunately, this does not work here because the socket variables is 
overwritten in case of a retry. So for this test, I would recommend creating a 
local `ServerSocket` and let the function connect to this socket. You can then 
control the failures from your test socket. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130222078
  
Actually point 3 is not so bad because we're using a buffered reader that 
fills the buffer and does not read a character from the socket on every call to 
`read()`.

The `read()` method may throw an Exception or return -1. So we need to 
handle both of these cases. If closed properly, the socket should send the EOF 
event and the `read()` method returns -1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130206658
  
@mxm 
@StephanEwen 
Hi, I do a test for this today and I got another problem.
The SocketTextStreamFunction use BufferedReader.read() to get the buffer 
which is sent by socket server.
And whether this function BufferedReader.read() will never return -1 as the 
end of the sent message?
If it was there should be another bug that code following will never be 
reachable:

if (data == -1) {
socket.close();
long retry = 0;
boolean success = false;
while ((retry  maxRetry || 
retryForever)  !success) {
if (!retryForever) {
retry++;
}
LOG.warn(Lost connection to 
server socket. Retrying in 
+ 
(CONNECTION_RETRY_SLEEP / 1000) +  seconds...);
try {
socket = new Socket();
socket.connect(new 
InetSocketAddress(hostname, port),

CONNECTION_TIMEOUT_TIME);
success = true;
} catch (ConnectException ce) {

Thread.sleep(CONNECTION_RETRY_SLEEP);
socket.close();
}
}

if (success) {
LOG.info(Server socket is 
reconnected.);
} else {
LOG.error(Could not reconnect 
to server socket.);
break;
}
reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
continue;
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130228505
  
Hi, there are two more questions:
1.In using StringBuilder, does it mean that we should use 
BufferedReader.readLine() instead of BufferedReader.read()?
2.Could you tell me how to make the BufferedReader.read() return -1? I 
tried many ways that all filed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130237642
  
 1.In using StringBuilder, does it mean that we should use 
BufferedReader.readLine() instead of BufferedReader.read()?

Reading by character is the way to go if we use a custom `delimiter`. If 
our delimiter was `\n` then it would be ok to read entire lines.

  Could you tell me how to make the BufferedReader.read() return -1? I 
tried many ways that all filed.

Ok :) Here is a minimal working example where `read()` returns `-1`:

```java
public static void main(String[] args) throws IOException {

ServerSocket socket = new ServerSocket(12345);

final SocketAddress socketAddress = socket.getLocalSocketAddress();

new Thread(new Runnable() {
@Override
public void run() {
Socket socket = new Socket();

try {
socket.connect(socketAddress);
} catch (IOException e) {
e.printStackTrace();
}

try {
BufferedReader bufferedReader = new 
BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println((bufferedReader.read()));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();

Socket channel = socket.accept();

channel.close();
}
```
Output:
```
-1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-12 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130311909
  
Thank you!
I`ll try again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-129864531
  
@HuangWHWHW `retryForever` is just a convenience variable for `maxRetry  
0`. Your fix is correct because the loop will only execute if `maxRetry  0` 
and thus not execute at all if it should retry forever. It would be great if 
you added a test that checks for the correct number of retries. In case of 
infinite retries, just check up to a certain number (e.g. 100 retries).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-11 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-129875331
  
@mxm 
Ok, I`ll add a test.
There is a little difficult that I can`t get the retry times in test since 
the retry is a local variable.
So can I add a function to get the retry times?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-10 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-129571942
  
This fix looks valid.

Can it be included in an extended test for the socket function? Something 
that validates that the function properly tries to reconnect?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-10 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-129667322
  
@StephanEwen 
Hi,yes,I plan to add a test for it.
However, the test may be failed since the retryForever in the flink-master 
is also unworked currently.
Will the test I`d add run together with this PR?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128309202
  
If you think it was necessary why was your first step to remove it's 
usage...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128322151
  
Hah
Sorry, this thought was generated after this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread HuangWHWHW
GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/992

[FLINK-2490][FIX]Remove the retryForever check in function streamFrom…

In the class SocketTextStreamFunction, the var retryForever only  be set in 
 the line this.retryForever = maxRetry  0;(SocketTextStreamFunction.java:54).
When the program executes this “while (retry  maxRetry  !success) ” 
it means the maxRetry  0 and the retryForever will always be false.
So it`s unnecessary to judge whether retryForever be false.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2490

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #992


commit 25a41d0f462add2f11c6830ec8db518702f8dbb5
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-06T08:55:24Z

[FLINK-2490][FIX]Remove the retryForever check in function streamFromSocket




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128307751
  
Yes, I understand you.
But I think the retryForever is necessary.
Maybe there is a bug that make the retryForever not working.
I`ll get another fix after the CI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128301542
  
if you remove that check, retryForever is unused and can be removed 
completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---