[FLINK-2067] [runtime] Unwrap the ExceptionInChainedOperatorException 
exceptions to clean up stack traces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6181302f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6181302f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6181302f

Branch: refs/heads/table-retraction
Commit: 6181302f1ab741b86af357e4513f5952a5fc1531
Parents: c9623be
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 2 22:48:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 2 23:11:03 2017 +0200

----------------------------------------------------------------------
 .../flink/util/WrappingRuntimeException.java    | 54 +++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  7 +++
 .../flink/runtime/taskmanager/TaskTest.java     | 57 ++++++++++++++++++--
 .../ExceptionInChainedOperatorException.java    | 11 +---
 .../streaming/runtime/tasks/OperatorChain.java  |  4 +-
 5 files changed, 120 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java 
b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
new file mode 100644
index 0000000..f9306df
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A runtime exception that is explicitly used to wrap non-runtime exceptions.
+ * 
+ * <p>The exception is recognized (for example by the Task when reporting 
exceptions as
+ * failure causes) and unwrapped to avoid including the wrapper's stack trace 
in the reports.
+ * That way, exception traces are keeping to the important parts.
+ */
+public class WrappingRuntimeException extends FlinkRuntimeException {
+
+       private static final long serialVersionUID = 1L;
+
+       public WrappingRuntimeException(@Nonnull Throwable cause) {
+               super(checkNotNull(cause));
+       }
+
+       public WrappingRuntimeException(String message, @Nonnull Throwable 
cause) {
+               super(message, checkNotNull(cause));
+       }
+
+       /**
+        * Recursively unwraps this WrappingRuntimeException and its causes, 
getting the first
+        * non wrapping exception.
+        * 
+        * @return The first cause that is not a wrapping exception.
+        */
+       public Throwable unwrap() {
+               Throwable cause = getCause();
+               return (cause instanceof WrappingRuntimeException) ? 
((WrappingRuntimeException) cause).unwrap() : cause;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dab0f95..e626dae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -71,6 +71,8 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.util.WrappingRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -727,6 +729,11 @@ public class Task implements Runnable, TaskActions {
                }
                catch (Throwable t) {
 
+                       // unwrap wrapped exceptions to make stack traces more 
compact
+                       if (t instanceof WrappingRuntimeException) {
+                               t = ((WrappingRuntimeException) t).unwrap();
+                       }
+
                        // 
----------------------------------------------------------------
                        // the execution failed. either the invokable code 
properly failed, or
                        // an exception was thrown as a side effect of 
cancelling

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 2522287..56a3b07 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -59,11 +59,13 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.WrappingRuntimeException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
@@ -117,9 +119,9 @@ public class TaskTest extends TestLogger {
        
        @Before
        public void createQueuesAndActors() {
-               taskManagerMessages = new LinkedBlockingQueue<Object>();
-               jobManagerMessages = new LinkedBlockingQueue<Object>();
-               listenerMessages = new LinkedBlockingQueue<Object>();
+               taskManagerMessages = new LinkedBlockingQueue<>();
+               jobManagerMessages = new LinkedBlockingQueue<>();
+               listenerMessages = new LinkedBlockingQueue<>();
                taskManagerGateway = new 
ForwardingActorGateway(taskManagerMessages);
                jobManagerGateway = new 
ForwardingActorGateway(jobManagerMessages);
                listenerGateway = new ForwardingActorGateway(listenerMessages);
@@ -335,6 +337,32 @@ public class TaskTest extends TestLogger {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testFailWithWrappedException() {
+               try {
+                       Task task = 
createTask(FailingInvokableWithChainedException.class);
+                       task.registerExecutionListener(listener);
+
+                       task.run();
+
+                       assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
+                       assertTrue(task.isCanceledOrFailed());
+
+                       Throwable cause = task.getFailureCause();
+                       assertTrue(cause instanceof IOException);
+
+                       validateTaskManagerStateChange(ExecutionState.RUNNING, 
task, false);
+                       validateUnregisterTask(task.getExecutionId());
+
+                       validateListenerMessage(ExecutionState.RUNNING, task, 
false);
+                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
        
        @Test
        public void testCancelDuringInvoke() {
@@ -1232,4 +1260,27 @@ public class TaskTest extends TestLogger {
                public void cancel() throws Exception {
                }
        }
+
+       public static final class FailingInvokableWithChainedException extends 
AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       throw new TestWrappedException(new IOException("test"));
+               }
+
+               @Override
+               public void cancel() {}
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test exceptions
+       // 
------------------------------------------------------------------------
+
+       private static class TestWrappedException extends 
WrappingRuntimeException {
+               private static final long serialVersionUID = 1L;
+
+               public TestWrappedException(@Nonnull Throwable cause) {
+                       super(cause);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
index 77c80c9..d4027bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -21,12 +21,13 @@ package org.apache.flink.streaming.runtime.tasks;
 import static java.util.Objects.requireNonNull;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.WrappingRuntimeException;
 
 /**
  * A special exception that signifies that the cause exception came from a 
chained operator.
  */
 @Internal
-public class ExceptionInChainedOperatorException extends RuntimeException {
+public class ExceptionInChainedOperatorException extends 
WrappingRuntimeException {
 
        private static final long serialVersionUID = 1L;
 
@@ -37,12 +38,4 @@ public class ExceptionInChainedOperatorException extends 
RuntimeException {
        public ExceptionInChainedOperatorException(String message, Throwable 
cause) {
                super(message, requireNonNull(cause));
        }
-
-       public Throwable getOriginalCause() {
-               Throwable ex = this;
-               do {
-                       ex = ex.getCause();
-               } while (ex instanceof ExceptionInChainedOperatorException);
-               return ex;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b85461d..870c2ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -54,6 +55,7 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -525,7 +527,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                operator.setKeyContextElement1(copy);
                                operator.processElement(copy);
                        } catch (Exception e) {
-                               throw new RuntimeException("Could not forward 
element to next operator", e);
+                               throw new 
ExceptionInChainedOperatorException(e);
                        }
 
                }

Reply via email to