Repository: spark
Updated Branches:
  refs/heads/master 92fc0a8f9 -> 466d011d3


[SPARK-26117][CORE][SQL] use SparkOutOfMemoryError instead of OutOfMemoryError 
when catch exception

## What changes were proposed in this pull request?

the pr #20014 which introduced `SparkOutOfMemoryError` to avoid killing the 
entire executor when an `OutOfMemoryError `is thrown.
so apply for memory using `MemoryConsumer. allocatePage `when  catch exception, 
use `SparkOutOfMemoryError `instead of `OutOfMemoryError`

## How was this patch tested?
N / A

Closes #23084 from heary-cao/SparkOutOfMemoryError.

Authored-by: caoxuewen <cao.xue...@zte.com.cn>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/466d011d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/466d011d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/466d011d

Branch: refs/heads/master
Commit: 466d011d3515723653e41d8b1d0b6150b9945f52
Parents: 92fc0a8
Author: caoxuewen <cao.xue...@zte.com.cn>
Authored: Fri Nov 23 21:12:25 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Nov 23 21:12:25 2018 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/spark/memory/MemoryConsumer.java | 10 +++++-----
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java |  5 +++--
 .../collection/unsafe/sort/UnsafeExternalSorterSuite.java |  7 ++++---
 .../collection/unsafe/sort/UnsafeInMemorySorterSuite.java |  5 +++--
 .../sql/catalyst/expressions/RowBasedKeyValueBatch.java   |  3 ++-
 .../org/apache/spark/sql/execution/python/RowQueue.scala  |  4 ++--
 6 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8371dec..4bfd2d3 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -83,10 +83,10 @@ public abstract class MemoryConsumer {
   public abstract long spill(long size, MemoryConsumer trigger) throws 
IOException;
 
   /**
-   * Allocates a LongArray of `size`. Note that this method may throw 
`OutOfMemoryError` if Spark
-   * doesn't have enough memory for this allocation, or throw 
`TooLargePageException` if this
-   * `LongArray` is too large to fit in a single page. The caller side should 
take care of these
-   * two exceptions, or make sure the `size` is small enough that won't 
trigger exceptions.
+   * Allocates a LongArray of `size`. Note that this method may throw 
`SparkOutOfMemoryError`
+   * if Spark doesn't have enough memory for this allocation, or throw 
`TooLargePageException`
+   * if this `LongArray` is too large to fit in a single page. The caller side 
should take care of
+   * these two exceptions, or make sure the `size` is small enough that won't 
trigger exceptions.
    *
    * @throws SparkOutOfMemoryError
    * @throws TooLargePageException
@@ -111,7 +111,7 @@ public abstract class MemoryConsumer {
   /**
    * Allocate a memory block with at least `required` bytes.
    *
-   * @throws OutOfMemoryError
+   * @throws SparkOutOfMemoryError
    */
   protected MemoryBlock allocatePage(long required) {
     MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, 
required), this);

http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab..a4e8859 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockManager;
@@ -741,7 +742,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
         if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
           try {
             growAndRehash();
-          } catch (OutOfMemoryError oom) {
+          } catch (SparkOutOfMemoryError oom) {
             canGrowArray = false;
           }
         }
@@ -757,7 +758,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   private boolean acquireNewPage(long required) {
     try {
       currentPage = allocatePage(required);
-    } catch (OutOfMemoryError e) {
+    } catch (SparkOutOfMemoryError e) {
       return false;
     }
     dataPages.add(currentPage);

http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 411cd5c..d1b29d9 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -38,6 +38,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.JavaSerializer;
 import org.apache.spark.serializer.SerializerInstance;
@@ -534,10 +535,10 @@ public class UnsafeExternalSorterSuite {
       insertNumber(sorter, 1024);
       fail("expected OutOfMmoryError but it seems operation surprisingly 
succeeded");
     }
-    // we expect an OutOfMemoryError here, anything else (i.e the original NPE 
is a failure)
-    catch (OutOfMemoryError oom){
+    // we expect an SparkOutOfMemoryError here, anything else (i.e the 
original NPE is a failure)
+    catch (SparkOutOfMemoryError oom){
       String oomStackTrace = Utils.exceptionString(oom);
-      assertThat("expected OutOfMemoryError in " +
+      assertThat("expected SparkOutOfMemoryError in " +
         
"org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
         oomStackTrace,
         Matchers.containsString(

http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 85ffdca..b0d485f 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -27,6 +27,7 @@ import org.apache.spark.HashPartitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -178,8 +179,8 @@ public class UnsafeInMemorySorterSuite {
     testMemoryManager.markExecutionAsOutOfMemoryOnce();
     try {
       sorter.reset();
-      fail("expected OutOfMmoryError but it seems operation surprisingly 
succeeded");
-    } catch (OutOfMemoryError oom) {
+      fail("expected SparkOutOfMemoryError but it seems operation surprisingly 
succeeded");
+    } catch (SparkOutOfMemoryError oom) {
       // as expected
     }
     // [SPARK-21907] this failed on NPE at

http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
index 4605138..6344cf1 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -126,7 +127,7 @@ public abstract class RowBasedKeyValueBatch extends 
MemoryConsumer implements Cl
   private boolean acquirePage(long requiredSize) {
     try {
       page = allocatePage(requiredSize);
-    } catch (OutOfMemoryError e) {
+    } catch (SparkOutOfMemoryError e) {
       logger.warn("Failed to allocate page ({} bytes).", requiredSize);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/466d011d/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
index d2820ff..eb12641 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Closeables
 
 import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.io.NioBufferedFileInputStream
-import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, 
TaskMemoryManager}
 import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.unsafe.Platform
@@ -226,7 +226,7 @@ private[python] case class HybridRowQueue(
     val page = try {
       allocatePage(required)
     } catch {
-      case _: OutOfMemoryError =>
+      case _: SparkOutOfMemoryError =>
         null
     }
     val buffer = if (page != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to