[FLINK-5650] [py] Continuously check PyProcess health while waiting for 
incoming connection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2bb5de6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2bb5de6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2bb5de6

Branch: refs/heads/master
Commit: c2bb5de6faeb314bd8cdb8c279a5cd9bfb4df059
Parents: 03889ae
Author: zentol <ches...@apache.org>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <ches...@apache.org>
Committed: Thu Mar 16 17:03:38 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 23 ++++++++++++-
 .../api/streaming/plan/PythonPlanStreamer.java  | 35 ++++++++++++++------
 2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index c968bd6..13275c4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -92,6 +92,7 @@ public class PythonStreamer implements Serializable {
         */
        public void open() throws IOException {
                server = new ServerSocket(0);
+               server.setSoTimeout(50);
                startPython();
        }
 
@@ -149,11 +150,31 @@ public class PythonStreamer implements Serializable {
                } catch (IllegalThreadStateException ise) { //process still 
active -> start receiving data
                }
 
-               socket = server.accept();
+               while (true) {
+                       try {
+                               socket = server.accept();
+                               break;
+                       } catch (SocketTimeoutException ignored) {
+                               checkPythonProcessHealth();
+                       }
+               }
                in = new DataInputStream(socket.getInputStream());
                out = new DataOutputStream(socket.getOutputStream());
        }
 
+       private void checkPythonProcessHealth() {
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+                       if (value == 0) {
+                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                       }
+               } catch (IllegalThreadStateException ise) {//Process still 
running
+               }
+       }
+
        /**
         * Closes this streamer.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 06af9d8..7b0b63f 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -50,8 +51,16 @@ public class PythonPlanStreamer implements Serializable {
 
        public void open(String tmpPath, String args) throws IOException {
                server = new ServerSocket(0);
+               server.setSoTimeout(50);
                startPython(tmpPath, args);
-               socket = server.accept();
+               while (true) {
+                       try {
+                               socket = server.accept();
+                               break;
+                       } catch (SocketTimeoutException ignored) {
+                               checkPythonProcessHealth();
+                       }
+               }
                sender = new PythonPlanSender(socket.getOutputStream());
                receiver = new PythonPlanReceiver(socket.getInputStream());
        }
@@ -74,16 +83,7 @@ public class PythonPlanStreamer implements Serializable {
                } catch (InterruptedException ex) {
                }
 
-               try {
-                       int value = process.exitValue();
-                       if (value != 0) {
-                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
-                       }
-                       if (value == 0) {
-                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
-                       }
-               } catch (IllegalThreadStateException ise) {//Process still 
running
-               }
+               checkPythonProcessHealth();
 
                
process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
                process.getOutputStream().write((server.getLocalPort() + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -98,4 +98,17 @@ public class PythonPlanStreamer implements Serializable {
                        process.destroy();
                }
        }
+
+       private void checkPythonProcessHealth() {
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+                       if (value == 0) {
+                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                       }
+               } catch (IllegalThreadStateException ise) {//Process still 
running
+               }
+       }
 }

Reply via email to