Repository: flink Updated Branches: refs/heads/master 99fb1f881 -> a2ec3ee66
[hotfix] [py] Improve error reporting in Python*InputStreamer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2ec3ee6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2ec3ee6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2ec3ee6 Branch: refs/heads/master Commit: a2ec3ee664b540c1213991d7fcf56d8873e60d40 Parents: 154bb3b Author: zentol <ches...@apache.org> Authored: Thu Apr 20 14:08:54 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu May 4 12:56:39 2017 +0200 ---------------------------------------------------------------------- .../python/api/streaming/data/PythonDualInputStreamer.java | 6 +++--- .../python/api/streaming/data/PythonSingleInputStreamer.java | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java index b7e8a25..8c9fde9 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; -import java.io.IOException; import java.net.SocketTimeoutException; import java.util.Iterator; @@ -46,9 +45,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho * @param iterator1 first input stream * @param iterator2 second input stream * @param c collector - * @throws IOException */ - public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) throws IOException { + public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) { SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1); SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2); try { @@ -93,6 +91,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho } } catch (SocketTimeoutException ignored) { throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); + } catch (Exception e) { + throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java index e7f018c..6c0a13c 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; -import java.io.IOException; import java.net.SocketTimeoutException; import java.util.Iterator; @@ -43,9 +42,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin * * @param iterator input stream * @param c collector - * @throws IOException */ - public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) throws IOException { + public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) { SingleElementPushBackIterator<IN> i = new SingleElementPushBackIterator<>(iterator); try { int size; @@ -86,7 +84,7 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin } catch (SocketTimeoutException ignored) { throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get()); } catch (Exception e) { - throw new RuntimeException("Critical failure. " + msg.get(), e); + throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e); } } }