[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14902485#comment-14902485 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-142272588 @HuangWHWHW No problem, we all learn all the time. It would only help to review and merge pull requests if the style follows more the Java best practices. It is something that you will learn fast, I am sure. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900358#comment-14900358 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-141904245 Hi, very sorry for bothering again. Since two weeks passed, do you have some time to review this PR recently? Will greatly appreciate it:) > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900555#comment-14900555 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-141952198 Looks good now, will merge this... > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14901885#comment-14901885 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-142173138 @StephanEwen Hi, I'm very sorry for the poor Java style of mine. And many thanks for your rework.I did a full review about your new fixes and get the points. I'll be more careful next time! And also thanks for the book.I'm doing more studies from now on. Generally, thanks for the time very much! > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900816#comment-14900816 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-142014348 I reworked this quite heavily during merging. There were a lot of issues that were against good Java style: - Variables in the classes, rather than in methods - The way references to threads were obtained - Defining clear parameter checks and exceptions - Handling InterruptedExceptions - polling versus clear conditions when state can be checked You can have a look at the code after my fixes, to see these issues in context. I would suggest to get a Java book (like "Effective Java", that is a good one) and take this as a guideline for future work. This pull request took more than 70 comments and still needed quite some rework (not for Flink-specific issues, but all of it general Java style/efficiency/correctness). I am afraid we cannot do that for every pull request, it would be completely overwhelming... > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900814#comment-14900814 ] ASF GitHub Bot commented on FLINK-2536: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1030 > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736489#comment-14736489 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r39020317 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { --- End diff -- I see, that is good. For that to be safe, though, the lock should be initialized with the class. In general, it is good practice to only use final references as locks. If the class needs to be serializable, use the `SerializableObject`. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736495#comment-14736495 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-138842285 @tillrohrmann Ah,sorry for bothering. It doesn't matter. Just I thought I did something wrong in the community. :-D > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736499#comment-14736499 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-138843852 No worries. We are simply overloaded right now. Many hard features under development, and many pull requests being opened. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736481#comment-14736481 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-138836978 Sorry, @HuangWHWHW, currently we're really busy. I'll try to review your PR once I find a free minute. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736493#comment-14736493 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-138841521 I think this looks good, except for the comment with the final variable for the lock. One more comment: When concatenating strings, avoid constructs like `" value=" + value.toString()`. Rather do `"value=" + value`. That is safe against null pointers. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736473#comment-14736473 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-138832801 @StephanEwen @tillrohrmann Hallo? > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738058#comment-14738058 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-139101747 Sorry for careless. I forget to change the git global user. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738091#comment-14738091 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-139104928 I removed the toString() method and change the lock to `private final SerializableObject lock`. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730865#comment-14730865 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-137747552 @StephanEwen @tillrohrmann Hi, I get the CI to rerun. Any new comment? > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725275#comment-14725275 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38410693 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { --- End diff -- The idea was to speed up the proper termination of the `SocketClientSink` upon calling `closeConnection`. Otherwise one would have to wait a complete `CONNECTION_RETRY_SLEEP` cycle. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725221#comment-14725221 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38408571 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { --- End diff -- The `lock` part can probably be replaced by a simple `Thread.sleep(...)`. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725223#comment-14725223 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38408757 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { + lock = new Object(); + } + + try { + synchronized (lock) { + lock.wait(CONNECTION_RETRY_SLEEP); + } + } catch(InterruptedException eee) { + LOG.error(eee.toString()); --- End diff -- I think there is no need to log this. Interrupting the thread usually means that it has been shut down. If you want to log exceptions, log them via ("message", exeption), here `LOG.error("Reconnect delay interrupted", e);` > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725277#comment-14725277 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38410780 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { --- End diff -- Ah, this is a comment of @tillrohrmann. See this: ![image](https://cloud.githubusercontent.com/assets/13193847/9603561/04dad2b4-50e4-11e5-96e3-143d4d5084bf.png) > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725280#comment-14725280 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38410820 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { + lock = new Object(); + } + + try { + synchronized (lock) { + lock.wait(CONNECTION_RETRY_SLEEP); + } + } catch(InterruptedException eee) { + LOG.error(eee.toString()); --- End diff -- Ok, I`ll remove it. > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725220#comment-14725220 ] ASF GitHub Bot commented on FLINK-2536: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38408542 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + --- End diff -- You can log the exception simpler like this: ``` LOG.error("Cannot send message " + value + " to socket server at " + hostName + ":" + port + ". Trying to reconnect.", e); `´` > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14718044#comment-14718044 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-135630392 The method notifyAll() maybe get a bug in my test file that sometimes it will be called before the wait(). So the method wait() will get stuck. Then I change this to a sub-thread that using join() to avoid it. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712460#comment-14712460 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134822139 @tillrohrmann Hi, I fix the conflict and get the CI rerun. Would you please to take a look about my new changes? Whether there will be some new comments? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710815#comment-14710815 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134513103 @tillrohrmann Hi, I take a new fix. But this: ![image](https://cloud.githubusercontent.com/assets/13193847/9461264/eccf96cc-4b3f-11e5-8d08-19ecd83eff7c.png) I have no good idea to care both the sink need to retry and should not finished retry when I reopen the socket server. So, I change the test testSocketSinkRetryAccess” from retry ten times to retry forever since this will never finished retry until the method closeConnection() is called or the reconnect is success. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710989#comment-14710989 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134538764 @tillrohrmann BTW:How to make the CI rerun? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711200#comment-14711200 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134573477 It should be re-triggered every time you push something new to your branch. For your local CI you should be able to manually restart a build by going to travis and click the restart button. On Tue, Aug 25, 2015 at 11:29 AM, HuangWHWHW notificati...@github.com wrote: @tillrohrmann https://github.com/tillrohrmann BTW:How to make the CI rerun? — Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/1030#issuecomment-134538764. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711220#comment-14711220 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134577603 @tillrohrmann Ok, I will do a update in my branch. But I cannot go to travis since it is blocked in China. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709237#comment-14709237 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748350 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709250#comment-14709250 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748875 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709173#comment-14709173 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37745480 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709175#comment-14709175 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37745586 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709241#comment-14709241 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748510 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709320#comment-14709320 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37752742 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +85,49 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + Log.error(Cannot send message + value.toString() + +to socket server at + hostName + : + port + . Caused by + e.toString() + + . Trying to reconnect.); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + Log.error(Reconnect to socket server and send message failed. Caused by + + ee.toString() + . Retry time(s): + retries); + synchronized (this) { + this.wait(CONNECTION_RETRY_SLEEP); --- End diff -- This can throw an `InterruptedException`. Should be treated by checking the termination criterion for example. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709158#comment-14709158 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744832 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709160#comment-14709160 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744884 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709157#comment-14709157 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744808 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709159#comment-14709159 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744849 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709168#comment-14709168 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37745343 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709171#comment-14709171 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37745469 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709232#comment-14709232 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748110 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709285#comment-14709285 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37750633 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +85,49 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + Log.error(Cannot send message + value.toString() + +to socket server at + hostName + : + port + . Caused by + e.toString() + + . Trying to reconnect.); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + Log.error(Reconnect to socket server and send message failed. Caused by + + ee.toString() + . Retry time(s): + retries); + synchronized (this) { --- End diff -- Locking on `this` is usually not recommended because outside code can influence your behaviour here. Thus, use a private lock field here. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709286#comment-14709286 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37750694 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -88,6 +136,7 @@ public void invoke(IN value) { */ private void closeConnection(){ try { + isRunning = false; --- End diff -- You should wake-up the waiting thread here. Otherwise waiting on a lock does not make any sense. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709304#comment-14709304 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37751617 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709305#comment-14709305 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37751723 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709161#comment-14709161 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744939 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709166#comment-14709166 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37745169 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709228#comment-14709228 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37747966 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709315#comment-14709315 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37752541 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709153#comment-14709153 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37744540 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); --- End diff -- I think it's good to set the error to `null` again. Otherwise all subsequent tests will fail with the same error message which is, however, completely unrelated. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception.
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709177#comment-14709177 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134175239 Hi @HuangWHWHW, I had some comments concerning the test cases. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709233#comment-14709233 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748179 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709243#comment-14709243 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134196700 Hi, Thank you. I`ll update a new fix. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709245#comment-14709245 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37748764 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709302#comment-14709302 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37751549 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() {
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709003#comment-14709003 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134107315 @tillrohrmann Hi, I take a new fix and add a test for retry success. Would you please to take a look? Thank you. BTW:Why does not the CI rerun?? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710417#comment-14710417 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37824613 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710403#comment-14710403 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37824211 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14710375#comment-14710375 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37823131 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke(testSocketSinkInvoke); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(testSocketSinkInvoke, value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { +
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706844#comment-14706844 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37643848 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{ assertEquals(Connected, this.access); assertEquals(testSocketSinkInvoke, value); } + + public Thread t; + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + Thread.sleep(1); --- End diff -- Yes, I understand you. I`ll take a change :-) Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706434#comment-14706434 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37617867 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ --- End diff -- Well, it actually depends on the exceptions for which we want to restart. If I'm not mistaken, then `new Socket()` and `dataOutputStream.write` only throw `IOExceptions`. Thus, we should change it. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706436#comment-14706436 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37618025 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{ assertEquals(Connected, this.access); assertEquals(testSocketSinkInvoke, value); } + + public Thread t; + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + Thread.sleep(1); --- End diff -- To be honest, I'm not a big fan of `sleep` based synchronization. Too often these kind of tests have failed on Travis. Usually if you use sleeps, the interval is either to short to allow different interleavings if you have bad luck or they are too long which makes the test slow. Therefore, I'd propose a simple wait object on which you wait from within the thread. Once you've closed the server socket, you can trigger the `notifyAll` method on this wait object to let the thread continue. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706059#comment-14706059 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37599021 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ + + Thread.sleep(CONNECTION_RETRY_SLEEP); + continue; --- End diff -- Sorry about a careless. Just remove it. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706058#comment-14706058 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37598999 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ --- End diff -- Yes, it depends on you. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706060#comment-14706060 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37599157 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{ assertEquals(Connected, this.access); assertEquals(testSocketSinkInvoke, value); } + + public Thread t; + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + Thread.sleep(1); --- End diff -- This is waiting for the socket server close. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706061#comment-14706061 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-133238105 Could we also add a test case where we test that the SocketClientSink can reconnect against a newly opened socket after it has been closed? This would be great. Good idea! I will try to do it. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705502#comment-14705502 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37563984 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ + + Thread.sleep(CONNECTION_RETRY_SLEEP); --- End diff -- Maybe it's also better to use a `synchronized(lock) { lock.wait(CONNECTION_RETRY_SLEEP) }` here. That way we can call in the `closeConnection` method the `lock.notifyAll` method to directly wake all sleeping threads up. This will speed up the shutdown procedure. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705487#comment-14705487 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37563493 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -16,22 +16,18 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions; +package org.apache.flink.streaming.api.functions.sink; -import java.io.IOException; +import java.io.*; --- End diff -- We don't use star imports. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705467#comment-14705467 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37562745 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ --- End diff -- We usually add whitespaces between keywords. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705475#comment-14705475 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37562991 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ + + Thread.sleep(CONNECTION_RETRY_SLEEP); + continue; --- End diff -- Why continue here? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705482#comment-14705482 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37563275 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); --- End diff -- I think we should at least log the exception. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705527#comment-14705527 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-133111629 Hi @HuangWHWHW, I had some minor comments. Could we also add a test case where we test that the `SocketClientSink` can reconnect against a newly opened socket after it has been closed? This would be great. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705486#comment-14705486 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37563424 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ + + Thread.sleep(CONNECTION_RETRY_SLEEP); --- End diff -- We should definitely log the exception so that the user can understand why the sink had problems. Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705485#comment-14705485 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37563322 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +84,44 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException(Cannot send message + value.toString() + -to socket server at + hostName + : + port, e); + retries = 0; + boolean success = false; + while ((retries maxRetry || retryForever) !success isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null !client.isClosed()) { + client.close(); + } + + if (!retryForever){ + retries++; + } + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + }catch(Exception ee){ --- End diff -- Do we really want to catch all exceptions here? Maybe `IOException` is enough? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14705504#comment-14705504 ] ASF GitHub Bot commented on FLINK-2536: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37564115 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{ assertEquals(Connected, this.access); assertEquals(testSocketSinkInvoke, value); } + + public Thread t; + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchemaString, byte[] simpleSchema = new SerializationSchemaString, byte[]() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSinkString simpleSink = new SocketClientSinkString(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + Thread.sleep(1); --- End diff -- Why do we have to sleep here? Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699265#comment-14699265 ] ASF GitHub Bot commented on FLINK-2536: --- GitHub user HuangWHWHW opened a pull request: https://github.com/apache/flink/pull/1030 [FLINK-2536][streaming]add a re-connect for socket sink add a re-connect in function invoke() when it throws exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HuangWHWHW/flink FLINK-2536 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1030.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1030 commit 85d5bb50419d6b803a9fc966dd4f95fcd042e21c Author: HuangWHWHW 404823...@qq.com Date: 2015-08-17T09:32:04Z [FLINK-2536][streaming]add a re-connect for socket sink Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)