Repository: flink
Updated Branches:
  refs/heads/master 99fb1f881 -> a2ec3ee66


[hotfix] [py] Improve error reporting in Python*InputStreamer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2ec3ee6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2ec3ee6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2ec3ee6

Branch: refs/heads/master
Commit: a2ec3ee664b540c1213991d7fcf56d8873e60d40
Parents: 154bb3b
Author: zentol <ches...@apache.org>
Authored: Thu Apr 20 14:08:54 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../python/api/streaming/data/PythonDualInputStreamer.java     | 6 +++---
 .../python/api/streaming/data/PythonSingleInputStreamer.java   | 6 ++----
 2 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index b7e8a25..8c9fde9 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 
@@ -46,9 +45,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends 
PythonStreamer<Pytho
         * @param iterator1 first input stream
         * @param iterator2 second input stream
         * @param c         collector
-        * @throws IOException
         */
-       public final void streamBufferWithGroups(Iterator<IN1> iterator1, 
Iterator<IN2> iterator2, Collector<OUT> c) throws IOException {
+       public final void streamBufferWithGroups(Iterator<IN1> iterator1, 
Iterator<IN2> iterator2, Collector<OUT> c) {
                SingleElementPushBackIterator<IN1> i1 = new 
SingleElementPushBackIterator<>(iterator1);
                SingleElementPushBackIterator<IN2> i2 = new 
SingleElementPushBackIterator<>(iterator2);
                try {
@@ -93,6 +91,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends 
PythonStreamer<Pytho
                        }
                } catch (SocketTimeoutException ignored) {
                        throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+               } catch (Exception e) {
+                       throw new RuntimeException("Critical failure for task " 
+ function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index e7f018c..6c0a13c 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 
@@ -43,9 +42,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends 
PythonStreamer<PythonSin
         *
         * @param iterator input stream
         * @param c        collector
-        * @throws IOException
         */
-       public final void streamBufferWithoutGroups(Iterator<IN> iterator, 
Collector<OUT> c) throws IOException {
+       public final void streamBufferWithoutGroups(Iterator<IN> iterator, 
Collector<OUT> c) {
                SingleElementPushBackIterator<IN> i = new 
SingleElementPushBackIterator<>(iterator);
                try {
                        int size;
@@ -86,7 +84,7 @@ public class PythonSingleInputStreamer<IN, OUT> extends 
PythonStreamer<PythonSin
                } catch (SocketTimeoutException ignored) {
                        throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + 
msg.get());
                } catch (Exception e) {
-                       throw new RuntimeException("Critical failure. " + 
msg.get(), e);
+                       throw new RuntimeException("Critical failure for task " 
+ function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
                }
        }
 }

Reply via email to