Repository: flink
Updated Branches:
  refs/heads/master 0d856b34f -> 1829819b6


[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available on
the system. The system failure message is that there is no such file or 
directory.
This error message should be descriptive enough in order to debug such a 
problem.

This closes #3774.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1829819b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1829819b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1829819b

Branch: refs/heads/master
Commit: 1829819b64fb32740bcbd4d5074e3fa138276f89
Parents: 0d856b3
Author: Till Rohrmann <[email protected]>
Authored: Tue Apr 25 20:41:58 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Apr 26 15:54:56 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 32 ++++++++++++--------
 .../flink/python/api/PythonPlanBinderTest.java  | 21 ++++++++++---
 2 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 97d5780..cc4ba43 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -19,6 +19,7 @@ import org.apache.flink.python.api.PythonOptions;
 import 
org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
 import 
org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,13 +115,7 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
 
                String pythonBinaryPath = 
config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
-               try {
-                       Runtime.getRuntime().exec(pythonBinaryPath);
-               } catch (IOException ignored) {
-                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
-               }
-
-               process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + config.getString(PLAN_ARGUMENTS_KEY, ""));
+               process = Runtime.getRuntime().exec(new String[] 
{pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, 
"")});
                outPrinter = new Thread(new 
StreamPrinter(process.getInputStream()));
                outPrinter.start();
                errorPrinter = new Thread(new 
StreamPrinter(process.getErrorStream(), msg));
@@ -130,8 +125,9 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                        @Override
                        public void run() {
                                try {
-                                       destroyProcess();
-                               } catch (IOException ignored) {
+                                       destroyProcess(process);
+                               } catch (IOException ioException) {
+                                       LOG.warn("Could not destroy python 
process.", ioException);
                                }
                        }
                };
@@ -192,20 +188,30 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
         * @throws IOException
         */
        public void close() throws IOException {
+               Throwable throwable = null;
+
                try {
                        socket.close();
                        sender.close();
                        receiver.close();
-               } catch (Exception e) {
-                       LOG.error("Exception occurred while closing Streamer. 
:{}", e.getMessage());
+               } catch (Throwable t) {
+                       throwable = t;
                }
-               destroyProcess();
+
+               try {
+                       destroyProcess(process);
+               } catch (Throwable t) {
+                       throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
+               }
+
                if (shutdownThread != null) {
                        Runtime.getRuntime().removeShutdownHook(shutdownThread);
                }
+
+               ExceptionUtils.tryRethrowIOException(throwable);
        }
 
-       private void destroyProcess() throws IOException {
+       public static void destroyProcess(Process process) throws IOException {
                try {
                        process.exitValue();
                } catch (IllegalThreadStateException ignored) { //process still 
active

http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index ba8ea78..20f3503 100644
--- 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -16,6 +16,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.IOException;
@@ -50,21 +51,33 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
                return files;
        }
 
-       private static boolean isPython2Supported() {
+       private static boolean isPython2Supported() throws IOException {
+               Process process = null;
+
                try {
-                       Runtime.getRuntime().exec("python");
+                       process = Runtime.getRuntime().exec("python");
                        return true;
                } catch (IOException ex) {
                        return false;
+               } finally {
+                       if (process != null) {
+                               PythonStreamer.destroyProcess(process);
+                       }
                }
        }
 
-       private static boolean isPython3Supported() {
+       private static boolean isPython3Supported() throws IOException {
+               Process process = null;
+
                try {
-                       Runtime.getRuntime().exec("python3");
+                       process = Runtime.getRuntime().exec("python3");
                        return true;
                } catch (IOException ex) {
                        return false;
+               } finally {
+                       if (process != null) {
+                               PythonStreamer.destroyProcess(process);
+                       }
                }
        }
 

Reply via email to