[
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725221#comment-14725221
]
ASF GitHub Bot commented on FLINK-2536:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1030#discussion_r38408571
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
---
@@ -73,13 +90,56 @@ public void intializeConnection() {
* The incoming data
*/
@Override
- public void invoke(IN value) {
+ public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
- throw new RuntimeException("Cannot send message " +
value.toString() +
- " to socket server at " + hostName +
":" + port, e);
+ LOG.error("Cannot send message " + value.toString() +
+ " to socket server at " + hostName +
":" + port + ". Caused by " + e.toString() +
+ ". Trying to reconnect.");
+ retries = 0;
+ boolean success = false;
+ while ((retries < maxRetry || retryForever) && !success
&& isRunning){
+ try {
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null &&
!client.isClosed()) {
+ client.close();
+ }
+
+ retries++;
+
+ client = new Socket(hostName, port);
+ dataOutputStream = new
DataOutputStream(client.getOutputStream());
+ dataOutputStream.write(msg);
+ success = true;
+
+ } catch(IOException ee) {
+ LOG.error("Reconnect to socket server
and send message failed. Caused by " +
+ ee.toString() + ".
Retry time(s):" + retries);
+
+ if (lock == null) {
--- End diff --
The `lock` part can probably be replaced by a simple `Thread.sleep(...)`.
> Add a retry for SocketClientSink
> --------------------------------
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Huang Wei
> Fix For: 0.10
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)