[FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e227b101 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e227b101 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e227b101 Branch: refs/heads/master Commit: e227b10134e387f3c49804dc0cc4c223c30702e3 Parents: 761d0a0 Author: Stephan Ewen <se...@apache.org> Authored: Fri Sep 2 11:45:25 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Sep 2 17:32:57 2016 +0200 ---------------------------------------------------------------------- .../partition/ProducerFailedException.java | 19 +++++-------------- .../partition/ProducerFailedExceptionTest.java | 12 ++++++------ 2 files changed, 11 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java index 2b2acab..934234d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.runtime.util.SerializedThrowable; /** * Network-stack level Exception to notify remote receiver about a failed @@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException { private static final long serialVersionUID = -1555492656299526395L; - private final String causeAsString; - /** * The cause of the producer failure. * - * Note: The cause will be stringified, because it might be an instance of - * a user level Exception, which can not be deserialized by the remote - * receiver's system class loader. + * <p>The cause will be stored as a {@link SerializedThrowable}, because it might + * be an instance of a user level Exception, which may not be possible to deserialize + * by the remote receiver's system class loader. */ public ProducerFailedException(Throwable cause) { - this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null; - } - - /** - * Returns the stringified cause of the producer failure. - */ - public String getCauseAsString() { - return causeAsString; + super(new SerializedThrowable(cause)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java index 042c136..ca2de0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java @@ -19,27 +19,27 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.util.SerializedThrowable; + import org.junit.Test; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ProducerFailedExceptionTest { @Test public void testInstanceOfCancelTaskException() throws Exception { - ProducerFailedException e = new ProducerFailedException(new Exception()); - assertTrue(e instanceof CancelTaskException); + assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class)); } @Test - public void testCauseIsStringified() throws Exception { + public void testCauseIsSerialized() throws Exception { // Tests that the cause is stringified, because it might be an instance // of a user level Exception, which can not be deserialized by the // remote receiver's system class loader. ProducerFailedException e = new ProducerFailedException(new Exception()); - assertNull(e.getCause()); - assertNotNull(e.getCauseAsString()); + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof SerializedThrowable); } }