[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14902485#comment-14902485
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-142272588
  
@HuangWHWHW No problem, we all learn all the time.
It would only help to review and merge pull requests if the style follows 
more the Java best practices. It is something that you will learn fast, I am 
sure.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900358#comment-14900358
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-141904245
  
Hi, very sorry for bothering again.
Since two weeks passed, do you have some time to review this PR recently?
Will greatly appreciate it:)


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900555#comment-14900555
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-141952198
  
Looks good now, will merge this...


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14901885#comment-14901885
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-142173138
  
@StephanEwen 
Hi, I'm very sorry for the poor Java style of mine.
And many thanks for your rework.I did a full review about your new fixes 
and get the points.
I'll be more careful next time!
And also thanks for the book.I'm doing more studies from now on.
Generally, thanks for the time very much!


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900816#comment-14900816
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-142014348
  
I reworked this quite heavily during merging. There were a lot of issues 
that were against good Java style:
  - Variables in the classes, rather than in methods
  - The way references to threads were obtained
  - Defining clear parameter checks and exceptions
  - Handling InterruptedExceptions
  - polling versus clear conditions when state can be checked

You can have a look at the code after my fixes, to see these issues in 
context.

I would suggest to get a Java book (like "Effective Java", that is a good 
one) and take this as a guideline for future work. This pull request took more 
than 70 comments and still needed quite some rework (not for Flink-specific 
issues, but all of it general Java style/efficiency/correctness). I am afraid 
we cannot do that for every pull request, it would be completely overwhelming...


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900814#comment-14900814
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1030


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736489#comment-14736489
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r39020317
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

I see, that is good. For that to be safe, though, the lock should be 
initialized with the class.

In general, it is good practice to only use final references as locks. If 
the class needs to be serializable, use the `SerializableObject`.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736495#comment-14736495
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-138842285
  
@tillrohrmann 
Ah,sorry for bothering.
It doesn't matter.
Just I thought I did something wrong in the community.
:-D


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736499#comment-14736499
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-138843852
  
No worries. We are simply overloaded right now. Many hard features under 
development, and many pull requests being opened.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736481#comment-14736481
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-138836978
  
Sorry, @HuangWHWHW, currently we're really busy. I'll try to review your PR 
once I find a free minute.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736493#comment-14736493
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-138841521
  
I think this looks good, except for the comment with the final variable for 
the lock.

One more comment: When concatenating strings, avoid constructs like `" 
value=" + value.toString()`. Rather do `"value=" + value`. That is safe against 
null pointers.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736473#comment-14736473
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-138832801
  
@StephanEwen 
@tillrohrmann 
Hallo?


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738058#comment-14738058
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-139101747
  
Sorry for careless.
I forget to change the git global user.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738091#comment-14738091
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-139104928
  
I removed the toString() method and change the lock to `private final 
SerializableObject lock`.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730865#comment-14730865
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-137747552
  
@StephanEwen 
@tillrohrmann
Hi,
I get the CI to rerun.
Any new comment?


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725275#comment-14725275
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410693
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The idea was to speed up the proper termination of the `SocketClientSink` 
upon calling `closeConnection`. Otherwise one would have to wait a complete 
`CONNECTION_RETRY_SLEEP` cycle.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725221#comment-14725221
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408571
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The `lock` part can probably be replaced by a simple `Thread.sleep(...)`.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725223#comment-14725223
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408757
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

I think there is no need to log this. Interrupting the thread usually means 
that it has been shut down.

If you want to log exceptions, log them via ("message", exeption), here 
`LOG.error("Reconnect delay interrupted", e);`


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725277#comment-14725277
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410780
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

Ah, this is a comment of @tillrohrmann.
See this:

![image](https://cloud.githubusercontent.com/assets/13193847/9603561/04dad2b4-50e4-11e5-96e3-143d4d5084bf.png)



> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725280#comment-14725280
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410820
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

Ok, I`ll remove it.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725220#comment-14725220
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408542
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
--- End diff --

You can log the exception simpler like this:
```
LOG.error("Cannot send message " + value + " to socket server at " + 
hostName + ":" + port + ". Trying to reconnect.", e);
`´`


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14718044#comment-14718044
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-135630392
  
The method notifyAll() maybe get a bug in my test file that sometimes it 
will be called before the wait().
So the method wait() will get stuck.
Then I change this to a sub-thread that using join() to avoid it.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712460#comment-14712460
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134822139
  
@tillrohrmann 
Hi, I fix the conflict and get the CI rerun.
Would you please to take a look about my new changes?
Whether there will be some new comments?


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710815#comment-14710815
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134513103
  
@tillrohrmann
Hi, I take a new fix.
But this:

![image](https://cloud.githubusercontent.com/assets/13193847/9461264/eccf96cc-4b3f-11e5-8d08-19ecd83eff7c.png)
I have no good idea to care both the sink need to retry and should not 
finished retry when I reopen the socket server.
So, I change the test testSocketSinkRetryAccess” from retry ten times to 
retry forever since this will never finished retry until the method 
closeConnection() is called or the reconnect is success.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710989#comment-14710989
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134538764
  
@tillrohrmann
BTW:How to make the CI rerun?


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711200#comment-14711200
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134573477
  
It should be re-triggered every time you push something new to your branch.
For your local CI you should be able to manually restart a build by going
to travis and click the restart button.

On Tue, Aug 25, 2015 at 11:29 AM, HuangWHWHW notificati...@github.com
wrote:

 @tillrohrmann https://github.com/tillrohrmann
 BTW:How to make the CI rerun?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/flink/pull/1030#issuecomment-134538764.




 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711220#comment-14711220
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134577603
  
@tillrohrmann 
Ok, I will do a update in my branch.
But I cannot go to travis since it is blocked in China.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709237#comment-14709237
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748350
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709250#comment-14709250
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748875
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709173#comment-14709173
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37745480
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709175#comment-14709175
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37745586
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709241#comment-14709241
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748510
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709320#comment-14709320
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37752742
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +85,49 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   Log.error(Cannot send message  + value.toString() +
+to socket server at  + hostName + 
: + port + . Caused by  + e.toString() +
+   . Trying to reconnect.);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   Log.error(Reconnect to socket server 
and send message failed. Caused by  +
+   ee.toString() + 
. Retry time(s): + retries);
+   synchronized (this) {
+   
this.wait(CONNECTION_RETRY_SLEEP);
--- End diff --

This can throw an `InterruptedException`. Should be treated by checking the 
termination criterion for example.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709158#comment-14709158
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744832
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709160#comment-14709160
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744884
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709157#comment-14709157
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709159#comment-14709159
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744849
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709168#comment-14709168
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37745343
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709171#comment-14709171
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37745469
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709232#comment-14709232
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748110
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709285#comment-14709285
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37750633
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +85,49 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   Log.error(Cannot send message  + value.toString() +
+to socket server at  + hostName + 
: + port + . Caused by  + e.toString() +
+   . Trying to reconnect.);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   Log.error(Reconnect to socket server 
and send message failed. Caused by  +
+   ee.toString() + 
. Retry time(s): + retries);
+   synchronized (this) {
--- End diff --

Locking on `this` is usually not recommended because outside code can 
influence your behaviour here. Thus, use a private lock field here.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709286#comment-14709286
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37750694
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -88,6 +136,7 @@ public void invoke(IN value) {
 */
private void closeConnection(){
try {
+   isRunning = false;
--- End diff --

You should wake-up the waiting thread here. Otherwise waiting on a lock 
does not make any sense.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709304#comment-14709304
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37751617
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709305#comment-14709305
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37751723
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709161#comment-14709161
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744939
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709166#comment-14709166
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37745169
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709228#comment-14709228
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37747966
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709315#comment-14709315
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37752541
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709153#comment-14709153
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37744540
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
--- End diff --

I think it's good to set the error to `null` again. Otherwise all 
subsequent tests will fail with the same error message which is, however, 
completely unrelated.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709177#comment-14709177
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134175239
  
Hi @HuangWHWHW, I had some comments concerning the test cases.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709233#comment-14709233
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748179
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709243#comment-14709243
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134196700
  
Hi,
Thank you.
I`ll update a new fix.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709245#comment-14709245
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37748764
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709302#comment-14709302
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37751549
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709003#comment-14709003
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-134107315
  
@tillrohrmann 
Hi,
I take a new fix and add a test for retry success.
Would you please to take a look?
Thank you.

BTW:Why does not the CI rerun??


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710417#comment-14710417
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37824613
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710403#comment-14710403
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37824211
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710375#comment-14710375
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37823131
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+   final AtomicReferenceThrowable error = new 
AtomicReferenceThrowable();
+   private final String host = 127.0.0.1;
+   private int port;
+   private String value;
+
+   public Thread t;
+
+   public SocketClientSinkTest() {
+   }
+
+   @Test
+   public void testSocketSink() throws Exception{
+   value = ;
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   
simpleSink.invoke(testSocketSinkInvoke);
+   simpleSink.close();
+   } catch (Exception e){
+   error.set(e);
+   }
+   }
+   }).start();
+
+   Socket sk = server.accept();
+   BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+   .getInputStream()));
+   value = rdr.readLine();
+
+   t.join();
+   server.close();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail(Error in spawned thread:  + t.getMessage());
+   }
+
+   assertEquals(testSocketSinkInvoke, value);
+   }
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+  

[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706844#comment-14706844
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37643848
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
assertEquals(Connected, this.access);
assertEquals(testSocketSinkInvoke, value);
}
+
+   public Thread t;
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   Thread.sleep(1);
--- End diff --

Yes, I understand you.
I`ll take a change :-)


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706434#comment-14706434
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37617867
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
--- End diff --

Well, it actually depends on the exceptions for which we want to restart. 
If I'm not mistaken, then `new Socket()` and `dataOutputStream.write` only 
throw `IOExceptions`. Thus, we should change it.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706436#comment-14706436
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37618025
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
assertEquals(Connected, this.access);
assertEquals(testSocketSinkInvoke, value);
}
+
+   public Thread t;
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   Thread.sleep(1);
--- End diff --

To be honest, I'm not a big fan of `sleep` based synchronization. Too often 
these kind of tests have failed on Travis. Usually if you use sleeps, the 
interval is either to short to allow different interleavings if you have bad 
luck or they are too long which makes the test slow. Therefore, I'd propose a 
simple wait object on which you wait from within the thread. Once you've closed 
the server socket, you can trigger the `notifyAll` method on this wait object 
to let the thread continue.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706059#comment-14706059
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37599021
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
+
+   Thread.sleep(CONNECTION_RETRY_SLEEP);
+   continue;
--- End diff --

Sorry about a careless.
Just remove it.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706058#comment-14706058
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37598999
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
--- End diff --

Yes, it depends on you.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706060#comment-14706060
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37599157
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
assertEquals(Connected, this.access);
assertEquals(testSocketSinkInvoke, value);
}
+
+   public Thread t;
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   Thread.sleep(1);
--- End diff --

This is waiting for the socket server close.



 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706061#comment-14706061
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-133238105
  
 Could we also add a test case where we test that the SocketClientSink can 
reconnect against a newly opened socket after it has been closed? This would be 
great.

Good idea!
I will try to do it.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705502#comment-14705502
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37563984
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
+
+   Thread.sleep(CONNECTION_RETRY_SLEEP);
--- End diff --

Maybe it's also better to use a `synchronized(lock) { 
lock.wait(CONNECTION_RETRY_SLEEP) }` here. That way we can call in the 
`closeConnection` method the `lock.notifyAll` method to directly wake all 
sleeping threads up. This will speed up the shutdown procedure.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705487#comment-14705487
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37563493
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -16,22 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions;
+package org.apache.flink.streaming.api.functions.sink;
 
-import java.io.IOException;
+import java.io.*;
--- End diff --

We don't use star imports.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705467#comment-14705467
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37562745
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
--- End diff --

We usually add whitespaces between keywords.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705475#comment-14705475
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37562991
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
+
+   Thread.sleep(CONNECTION_RETRY_SLEEP);
+   continue;
--- End diff --

Why continue here?


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705482#comment-14705482
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37563275
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
--- End diff --

I think we should at least log the exception.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705527#comment-14705527
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-133111629
  
Hi @HuangWHWHW, I had some minor comments. Could we also add a test case 
where we test that the `SocketClientSink` can reconnect against a newly opened 
socket after it has been closed? This would be great.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705486#comment-14705486
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37563424
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
+
+   Thread.sleep(CONNECTION_RETRY_SLEEP);
--- End diff --

We should definitely log the exception so that the user can understand why 
the sink had problems.


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705485#comment-14705485
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37563322
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +84,44 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException(Cannot send message  + 
value.toString() +
-to socket server at  + hostName + 
: + port, e);
+   retries = 0;
+   boolean success = false;
+   while ((retries  maxRetry || retryForever)  !success 
 isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null  
!client.isClosed()) {
+   client.close();
+   }
+
+   if (!retryForever){
+   retries++;
+   }
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   }catch(Exception ee){
--- End diff --

Do we really want to catch all exceptions here? Maybe `IOException` is 
enough?


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705504#comment-14705504
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r37564115
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 ---
@@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
assertEquals(Connected, this.access);
assertEquals(testSocketSinkInvoke, value);
}
+
+   public Thread t;
+
+   @Test
+   public void testSocketSinkNoRetry() throws Exception{
+   ServerSocket server = new ServerSocket(0);
+   port = server.getLocalPort();
+
+   new Thread(new Runnable() {
+
+   @Override
+   public void run() {
+   t = Thread.currentThread();
+   SerializationSchemaString, byte[] 
simpleSchema = new SerializationSchemaString, byte[]() {
+   @Override
+   public byte[] serialize(String element) 
{
+   return element.getBytes();
+   }
+   };
+
+   try {
+   SocketClientSinkString simpleSink = 
new SocketClientSinkString(host, port, simpleSchema, 0);
+   simpleSink.open(new Configuration());
+   Thread.sleep(1);
--- End diff --

Why do we have to sleep here?


 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699265#comment-14699265
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1030

[FLINK-2536][streaming]add a re-connect for socket sink

add a re-connect in function invoke() when it throws exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2536

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1030


commit 85d5bb50419d6b803a9fc966dd4f95fcd042e21c
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-17T09:32:04Z

[FLINK-2536][streaming]add a re-connect for socket sink




 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)