Repository: flink Updated Branches: refs/heads/master 0d856b34f -> 1829819b6
[FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. This closes #3774. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1829819b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1829819b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1829819b Branch: refs/heads/master Commit: 1829819b64fb32740bcbd4d5074e3fa138276f89 Parents: 0d856b3 Author: Till Rohrmann <[email protected]> Authored: Tue Apr 25 20:41:58 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Apr 26 15:54:56 2017 +0200 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 32 ++++++++++++-------- .../flink/python/api/PythonPlanBinderTest.java | 21 ++++++++++--- 2 files changed, 36 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 97d5780..cc4ba43 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -19,6 +19,7 @@ import org.apache.flink.python.api.PythonOptions; import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer; import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer; import org.apache.flink.python.api.streaming.util.StreamPrinter; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,13 +115,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ignored) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + config.getString(PLAN_ARGUMENTS_KEY, "")); + process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")}); outPrinter = new Thread(new StreamPrinter(process.getInputStream())); outPrinter.start(); errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); @@ -130,8 +125,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable @Override public void run() { try { - destroyProcess(); - } catch (IOException ignored) { + destroyProcess(process); + } catch (IOException ioException) { + LOG.warn("Could not destroy python process.", ioException); } } }; @@ -192,20 +188,30 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable * @throws IOException */ public void close() throws IOException { + Throwable throwable = null; + try { socket.close(); sender.close(); receiver.close(); - } catch (Exception e) { - LOG.error("Exception occurred while closing Streamer. :{}", e.getMessage()); + } catch (Throwable t) { + throwable = t; } - destroyProcess(); + + try { + destroyProcess(process); + } catch (Throwable t) { + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); + } + if (shutdownThread != null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); } + + ExceptionUtils.tryRethrowIOException(throwable); } - private void destroyProcess() throws IOException { + public static void destroyProcess(Process process) throws IOException { try { process.exitValue(); } catch (IllegalThreadStateException ignored) { //process still active http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index ba8ea78..20f3503 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -16,6 +16,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.test.util.JavaProgramTestBase; import java.io.IOException; @@ -50,21 +51,33 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { return files; } - private static boolean isPython2Supported() { + private static boolean isPython2Supported() throws IOException { + Process process = null; + try { - Runtime.getRuntime().exec("python"); + process = Runtime.getRuntime().exec("python"); return true; } catch (IOException ex) { return false; + } finally { + if (process != null) { + PythonStreamer.destroyProcess(process); + } } } - private static boolean isPython3Supported() { + private static boolean isPython3Supported() throws IOException { + Process process = null; + try { - Runtime.getRuntime().exec("python3"); + process = Runtime.getRuntime().exec("python3"); return true; } catch (IOException ex) { return false; + } finally { + if (process != null) { + PythonStreamer.destroyProcess(process); + } } }
