[hotfix] [py] Code cleanup - PythonStreamer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd588efe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd588efe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd588efe Branch: refs/heads/table-retraction Commit: dd588efe6470dfdcad51249ff36f12cfde7b75da Parents: 258ed17 Author: zentol <ches...@apache.org> Authored: Thu Mar 30 23:13:07 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Apr 5 20:44:30 2017 +0200 ---------------------------------------------------------------------- .../python/api/streaming/data/PythonStreamer.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dd588efe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 3c79b1f..006a1b2 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -84,7 +84,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable this.setID = setID; this.usePython3 = PythonPlanBinder.usePython3; planArguments = PythonPlanBinder.arguments.toString(); - receiver = new PythonReceiver(usesByteArray); + receiver = new PythonReceiver<>(usesByteArray); this.function = function; this.sender = sender; } @@ -162,10 +162,22 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable private void checkPythonProcessHealth() { try { int value = process.exitValue(); + try { + outPrinter.join(); + } catch (InterruptedException ignored) { + outPrinter.interrupt(); + Thread.interrupted(); + } + try { + errorPrinter.join(); + } catch (InterruptedException ignored) { + errorPrinter.interrupt(); + Thread.interrupted(); + } if (value != 0) { - throw new RuntimeException("Plan file caused an error. Check log-files for details."); + throw new RuntimeException("Plan file caused an error. Check log-files for details." + msg.get()); } else { - throw new RuntimeException("Plan file exited prematurely without an error."); + throw new RuntimeException("Plan file exited prematurely without an error." + msg.get()); } } catch (IllegalThreadStateException ignored) {//Process still running }