[hotfix] [py] Code cleanup - StreamPrinter

- implements Runnable instead of extending Thread
- use AtomicRefence<String> instead of StringBuilder
- remove redundant wrapInException argument


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258ed179
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258ed179
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258ed179

Branch: refs/heads/table-retraction
Commit: 258ed1791a3e66443b29e4a4e14dcc4e03ffd9e6
Parents: f4324ba
Author: zentol <ches...@apache.org>
Authored: Thu Mar 30 19:41:54 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 5 20:43:55 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      |  7 +++--
 .../api/streaming/plan/PythonPlanStreamer.java  |  4 +--
 .../api/streaming/util/StreamPrinter.java       | 30 ++++++++++++--------
 3 files changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 219ae27..3c79b1f 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -32,6 +32,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -71,7 +72,7 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
        protected S sender;
        protected PythonReceiver<OUT> receiver;
 
-       protected StringBuilder msg = new StringBuilder();
+       protected AtomicReference<String> msg = new AtomicReference<>();
 
        protected final AbstractRichFunction function;
 
@@ -118,9 +119,9 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                }
 
                process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + planArguments);
-               outPrinter = new StreamPrinter(process.getInputStream());
+               outPrinter = new Thread(new 
StreamPrinter(process.getInputStream()));
                outPrinter.start();
-               errorPrinter = new StreamPrinter(process.getErrorStream(), 
true, msg);
+               errorPrinter = new Thread(new 
StreamPrinter(process.getErrorStream(), msg));
                errorPrinter.start();
 
                shutdownThread = new Thread() {

http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 4eb0f51..9b62563 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -67,8 +67,8 @@ public class PythonPlanStreamer {
                }
                process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
tmpPath + FLINK_PYTHON_PLAN_NAME + args);
 
-               new StreamPrinter(process.getInputStream()).start();
-               new StreamPrinter(process.getErrorStream()).start();
+               new Thread(new StreamPrinter(process.getInputStream())).start();
+               new Thread(new StreamPrinter(process.getErrorStream())).start();
 
                server = new ServerSocket(0);
                server.setSoTimeout(50);

http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index 30a728c..c6a1ede 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -18,40 +18,46 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Simple utility class to print all contents of an inputstream to stdout.
  */
-public class StreamPrinter extends Thread {
+public class StreamPrinter implements Runnable {
        private final BufferedReader reader;
-       private final boolean wrapInException;
-       private StringBuilder msg;
+       private final AtomicReference<String> output;
 
        public StreamPrinter(InputStream stream) {
-               this(stream, false, null);
+               this(stream, null);
        }
 
-       public StreamPrinter(InputStream stream, boolean wrapInException, 
StringBuilder msg) {
+       public StreamPrinter(InputStream stream, AtomicReference<String> 
output) {
                this.reader = new BufferedReader(new InputStreamReader(stream, 
ConfigConstants.DEFAULT_CHARSET));
-               this.wrapInException = wrapInException;
-               this.msg = msg;
+               this.output = output;
        }
 
        @Override
        public void run() {
                String line;
-               try {
-                       if (wrapInException) {
+               if (output != null) {
+                       StringBuilder msg = new StringBuilder();
+                       try {
                                while ((line = reader.readLine()) != null) {
-                                       msg.append("\n" + line);
+                                       msg.append(line);
+                                       msg.append("\n");
                                }
-                       } else {
+                       } catch (IOException ignored) {
+                       } finally {
+                               output.set(msg.toString());
+                       }
+               } else {
+                       try {
                                while ((line = reader.readLine()) != null) {
                                        System.out.println(line);
                                        System.out.flush();
                                }
+                       } catch (IOException ignored) {
                        }
-               } catch (IOException ex) {
                }
        }
 }

Reply via email to