Repository: spark Updated Branches: refs/heads/master 6ceb16960 -> 0f92be5b5
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer Author: Holden Karau <hol...@pigscanfly.ca> Closes #6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits: f807832 [Holden Karau] Log error if we can't throw it 855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates. 039d620 [Holden Karau] Add missing closeandwriteoutput 30e558d [Holden Karau] go back to try/finally e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception ae0b7a7 [Holden Karau] Fix the test 2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f92be5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f92be5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f92be5b Branch: refs/heads/master Commit: 0f92be5b5f017b593bd29d4da7e89aad2b3adac2 Parents: 6ceb169 Author: Holden Karau <hol...@pigscanfly.ca> Authored: Tue Jun 23 09:08:11 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Tue Jun 23 09:08:11 2015 -0700 ---------------------------------------------------------------------- .../spark/shuffle/unsafe/UnsafeShuffleWriter.java | 18 ++++++++++++++++-- .../shuffle/unsafe/UnsafeShuffleWriterSuite.java | 17 +++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index ad7eb04..764578b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -139,6 +139,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { @Override public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { + // Keep track of success so we know if we ecountered an exception + // We do this rather than a standard try/catch/re-throw to handle + // generic throwables. boolean success = false; try { while (records.hasNext()) { @@ -147,8 +150,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { closeAndWriteOutput(); success = true; } finally { - if (!success) { - sorter.cleanupAfterError(); + if (sorter != null) { + try { + sorter.cleanupAfterError(); + } catch (Exception e) { + // Only throw this error if we won't be masking another + // error. + if (success) { + throw e; + } else { + logger.error("In addition to a failure during writing, we failed during " + + "cleanup.", e); + } + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 83d1091..10c3eed 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -253,6 +253,23 @@ public class UnsafeShuffleWriterSuite { createWriter(false).stop(false); } + class PandaException extends RuntimeException { + } + + @Test(expected=PandaException.class) + public void writeFailurePropagates() throws Exception { + class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> { + @Override public boolean hasNext() { + throw new PandaException(); + } + @Override public Product2<Object, Object> next() { + return null; + } + } + final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); + writer.write(new BadRecords()); + } + @Test public void writeEmptyIterator() throws Exception { final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org