[FLINK-4566] [network runtime] Properly preserve exception causes for 
ProducerFailedException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e227b101
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e227b101
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e227b101

Branch: refs/heads/master
Commit: e227b10134e387f3c49804dc0cc4c223c30702e3
Parents: 761d0a0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:45:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../partition/ProducerFailedException.java       | 19 +++++--------------
 .../partition/ProducerFailedExceptionTest.java   | 12 ++++++------
 2 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 2b2acab..934234d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * Network-stack level Exception to notify remote receiver about a failed
@@ -29,23 +29,14 @@ public class ProducerFailedException extends 
CancelTaskException {
 
        private static final long serialVersionUID = -1555492656299526395L;
 
-       private final String causeAsString;
-
        /**
         * The cause of the producer failure.
         *
-        * Note: The cause will be stringified, because it might be an instance 
of
-        * a user level Exception, which can not be deserialized by the remote
-        * receiver's system class loader.
+        * <p>The cause will be stored as a {@link SerializedThrowable}, 
because it might
+        * be an instance of a user level Exception, which may not be possible 
to deserialize
+        * by the remote receiver's system class loader.
         */
        public ProducerFailedException(Throwable cause) {
-               this.causeAsString = cause != null ? 
ExceptionUtils.stringifyException(cause) : null;
-       }
-
-       /**
-        * Returns the stringified cause of the producer failure.
-        */
-       public String getCauseAsString() {
-               return causeAsString;
+               super(new SerializedThrowable(cause));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 042c136..ca2de0c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,27 +19,27 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ProducerFailedExceptionTest {
 
        @Test
        public void testInstanceOfCancelTaskException() throws Exception {
-               ProducerFailedException e = new ProducerFailedException(new 
Exception());
-               assertTrue(e instanceof CancelTaskException);
+               
assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
        }
 
        @Test
-       public void testCauseIsStringified() throws Exception {
+       public void testCauseIsSerialized() throws Exception {
                // Tests that the cause is stringified, because it might be an 
instance
                // of a user level Exception, which can not be deserialized by 
the
                // remote receiver's system class loader.
                ProducerFailedException e = new ProducerFailedException(new 
Exception());
-               assertNull(e.getCause());
-               assertNotNull(e.getCauseAsString());
+               assertNotNull(e.getCause());
+               assertTrue(e.getCause() instanceof SerializedThrowable);
        }
 }

Reply via email to