[hotfix] [py] Code cleanup - PythonStreamer

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd588efe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd588efe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd588efe

Branch: refs/heads/table-retraction
Commit: dd588efe6470dfdcad51249ff36f12cfde7b75da
Parents: 258ed17
Author: zentol <ches...@apache.org>
Authored: Thu Mar 30 23:13:07 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 5 20:44:30 2017 +0200

----------------------------------------------------------------------
 .../python/api/streaming/data/PythonStreamer.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd588efe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3c79b1f..006a1b2 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -84,7 +84,7 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                this.setID = setID;
                this.usePython3 = PythonPlanBinder.usePython3;
                planArguments = PythonPlanBinder.arguments.toString();
-               receiver = new PythonReceiver(usesByteArray);
+               receiver = new PythonReceiver<>(usesByteArray);
                this.function = function;
                this.sender = sender;
        }
@@ -162,10 +162,22 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
        private void checkPythonProcessHealth() {
                try {
                        int value = process.exitValue();
+                       try {
+                               outPrinter.join();
+                       } catch (InterruptedException ignored) {
+                               outPrinter.interrupt();
+                               Thread.interrupted();
+                       }
+                       try {
+                               errorPrinter.join();
+                       } catch (InterruptedException ignored) {
+                               errorPrinter.interrupt();
+                               Thread.interrupted();
+                       }
                        if (value != 0) {
-                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details." + msg.get());
                        } else {
-                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                               throw new RuntimeException("Plan file exited 
prematurely without an error." + msg.get());
                        }
                } catch (IllegalThreadStateException ignored) {//Process still 
running
                }

Reply via email to