[FLINK-4567] [runtime] Enhance SerializedThrowable to properly mimic Exception 
causes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/761d0a02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/761d0a02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/761d0a02

Branch: refs/heads/master
Commit: 761d0a02505c7eaef7a566f978145b187c89cbf8
Parents: c251efc
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:38:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/util/SerializedThrowable.java | 88 ++++++++++----------
 .../runtime/util/SerializedThrowableTest.java   | 40 ++++++++-
 2 files changed, 83 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/761d0a02/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index a7739ef..4dea59c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -21,18 +21,19 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
-import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Utility class for dealing with user-defined Throwable types that are 
serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the 
default
  * class loader.
- * <p>
- * This exception mimics the original exception with respect to message and 
stack trace,
+ * 
+ * <p>This exception mimics the original exception with respect to message and 
stack trace,
  * and contains the original exception in serialized form. The original 
exception
  * can be re-obtained by supplying the appropriate class loader.
  */
@@ -49,10 +50,6 @@ public class SerializedThrowable extends Exception 
implements Serializable {
        /** The original stack trace, to be printed */
        private final String fullStingifiedStackTrace;
 
-       /** A guaranteed serializable placeholder exception that will be used as
-        * cause and to capture the original stack trace */
-       private final Exception placeholder;
-       
        /** The original exception, not transported via serialization, 
         * because the class may not be part of the system class loader.
         * In addition, we make sure our cached references to not prevent
@@ -66,33 +63,43 @@ public class SerializedThrowable extends Exception 
implements Serializable {
         * @param exception The exception to serialize.
         */
        public SerializedThrowable(Throwable exception) {
+               this(exception, new HashSet<Throwable>());
+       }
+
+       private SerializedThrowable(Throwable exception, Set<Throwable> 
alreadySeen) {
                super(getMessageOrError(exception));
 
                if (!(exception instanceof SerializedThrowable)) {
-                       this.cachedException = new 
WeakReference<Throwable>(exception);
-                       
-                       this.originalErrorClassName = 
exception.getClass().getName();
-                       this.fullStingifiedStackTrace = 
ExceptionUtils.stringifyException(exception);
-                       this.placeholder = new Exception(
-                                       "Serialized representation of " + 
originalErrorClassName + ": " + getMessage());
-                       
this.placeholder.setStackTrace(exception.getStackTrace());
-                       initCause(this.placeholder);
-                       
+                       // serialize and memoize the original message
                        byte[] serialized;
                        try {
                                serialized = 
InstantiationUtil.serializeObject(exception);
                        }
                        catch (Throwable t) {
-                               // could not serialize exception. send the 
stringified version instead
-                               try {
-                                       serialized = 
InstantiationUtil.serializeObject(placeholder);
-                               }
-                               catch (IOException e) {
-                                       // this should really never happen, as 
we only serialize a a standard exception
-                                       throw new 
RuntimeException(e.getMessage(), e);
-                               }
+                               serialized = null;
                        }
                        this.serializedException = serialized;
+                       this.cachedException = new 
WeakReference<Throwable>(exception);
+
+                       // record the original exception's properties (name, 
stack prints)
+                       this.originalErrorClassName = 
exception.getClass().getName();
+                       this.fullStingifiedStackTrace = 
ExceptionUtils.stringifyException(exception);
+
+                       // mimic the original exception's stack trace
+                       setStackTrace(exception.getStackTrace());
+
+                       // mimic the original exception's cause
+                       if (exception.getCause() == null) {
+                               initCause(null);
+                       }
+                       else {
+                               // exception causes may by cyclic, so we 
truncate the cycle when we find it 
+                               if (alreadySeen.add(exception)) {
+                                       // we are not in a cycle, yet
+                                       initCause(new 
SerializedThrowable(exception.getCause(), alreadySeen));
+                               }
+                       }
+
                }
                else {
                        // copy from that serialized throwable
@@ -100,39 +107,37 @@ public class SerializedThrowable extends Exception 
implements Serializable {
                        this.serializedException = other.serializedException;
                        this.originalErrorClassName = 
other.originalErrorClassName;
                        this.fullStingifiedStackTrace = 
other.fullStingifiedStackTrace;
-                       this.placeholder = other.placeholder;
                        this.cachedException = other.cachedException;
                }
        }
 
        public Throwable deserializeError(ClassLoader classloader) {
+               if (serializedException == null) {
+                       // failed to serialize the original exception
+                       // return this SerializedThrowable as a stand in
+                       return this;
+               }
+
                Throwable cached = cachedException == null ? null : 
cachedException.get();
                if (cached == null) {
                        try {
                                cached = 
InstantiationUtil.deserializeObject(serializedException, classloader);
                                cachedException = new 
WeakReference<Throwable>(cached);
                        }
-                       catch (Exception e) {
-                               return placeholder;
+                       catch (Throwable t) {
+                               // something went wrong
+                               // return this SerializedThrowable as a stand in
+                               return this;
                        }
                }
                return cached;
        }
-       
-       public String getStrigifiedStackTrace() {
-               return fullStingifiedStackTrace;
-       }
-       
+
        // 
------------------------------------------------------------------------
        //  Override the behavior of Throwable
        // 
------------------------------------------------------------------------
 
        @Override
-       public Throwable getCause() {
-               return placeholder;
-       }
-
-       @Override
        public void printStackTrace(PrintStream s) {
                s.print(fullStingifiedStackTrace);
                s.flush();
@@ -150,15 +155,10 @@ public class SerializedThrowable extends Exception 
implements Serializable {
                return (message != null) ? (originalErrorClassName + ": " + 
message) : originalErrorClassName;
        }
 
-       @Override
-       public StackTraceElement[] getStackTrace() {
-               return placeholder.getStackTrace();
-       }
-
        // 
------------------------------------------------------------------------
        //  Static utilities
        // 
------------------------------------------------------------------------
-       
+
        public static Throwable get(Throwable serThrowable, ClassLoader loader) 
{
                if (serThrowable instanceof SerializedThrowable) {
                        return 
((SerializedThrowable)serThrowable).deserializeError(loader);
@@ -166,7 +166,7 @@ public class SerializedThrowable extends Exception 
implements Serializable {
                        return serThrowable;
                }
        }
-       
+
        private static String getMessageOrError(Throwable error) {
                try {
                        return error.getMessage();

http://git-wip-us.apache.org/repos/asf/flink/blob/761d0a02/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 50efd52..4d57892 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -131,10 +131,48 @@ public class SerializedThrowableTest {
                        // deserialize the proper exception
                        Throwable deserialized = copy.deserializeError(loader); 
                        assertEquals(clazz, deserialized.getClass());
+
+                       // deserialization with the wrong classloader does not 
lead to a failure
+                       Throwable wronglyDeserialized = 
copy.deserializeError(getClass().getClassLoader());
+                       
assertEquals(ExceptionUtils.stringifyException(userException),
+                                       
ExceptionUtils.stringifyException(wronglyDeserialized));
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-       } 
+       }
+
+       @Test
+       public void testCauseChaining() {
+               Exception cause2 = new Exception("level2");
+               Exception cause1 = new Exception("level1", cause2);
+               Exception root = new Exception("level0", cause1);
+
+               SerializedThrowable st = new SerializedThrowable(root);
+
+               assertEquals("level0", st.getMessage());
+
+               assertNotNull(st.getCause());
+               assertEquals("level1", st.getCause().getMessage());
+
+               assertNotNull(st.getCause().getCause());
+               assertEquals("level2", st.getCause().getCause().getMessage());
+       }
+
+       @Test
+       public void testCyclicCauseChaining() {
+               Exception cause3 = new Exception("level3");
+               Exception cause2 = new Exception("level2", cause3);
+               Exception cause1 = new Exception("level1", cause2);
+               Exception root = new Exception("level0", cause1);
+
+               // introduce a cyclic reference
+               cause3.initCause(cause1);
+
+               SerializedThrowable st = new SerializedThrowable(root);
+
+               assertArrayEquals(root.getStackTrace(), st.getStackTrace());
+               assertEquals(ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+       }
 }

Reply via email to