Repository: spark
Updated Branches:
  refs/heads/master 6129ffa11 -> 3a7494dfe


[SPARK-22827][CORE] Avoid throwing OutOfMemoryError in case of exception in 
spill

## What changes were proposed in this pull request?
Currently, the task memory manager throws an OutofMemory error when there is an 
IO exception happens in spill() - 
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194.
 Similarly there any many other places in code when if a task is not able to 
acquire memory due to an exception we throw an OutofMemory error which kills 
the entire executor and hence failing all the tasks that are running on that 
executor instead of just failing one single task.

## How was this patch tested?

Unit tests

Author: Sital Kedia <ske...@fb.com>

Closes #20014 from sitalkedia/skedia/upstream_SPARK-22827.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a7494df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a7494df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a7494df

Branch: refs/heads/master
Commit: 3a7494dfee714510f6a62d5533023f3e0d5ccdcd
Parents: 6129ffa
Author: Sital Kedia <ske...@fb.com>
Authored: Wed Dec 20 12:21:00 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Dec 20 12:21:00 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryConsumer.java |  4 +--
 .../spark/memory/SparkOutOfMemoryError.java     | 36 ++++++++++++++++++++
 .../apache/spark/memory/TaskMemoryManager.java  |  4 +--
 .../shuffle/sort/ShuffleExternalSorter.java     |  3 +-
 .../unsafe/sort/UnsafeExternalSorter.java       |  3 +-
 .../unsafe/sort/UnsafeInMemorySorter.java       |  3 +-
 .../org/apache/spark/executor/Executor.scala    |  5 ++-
 .../aggregate/TungstenAggregationIterator.scala |  3 +-
 8 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 2dff241..a7bd4b3 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -88,7 +88,7 @@ public abstract class MemoryConsumer {
    * `LongArray` is too large to fit in a single page. The caller side should 
take care of these
    * two exceptions, or make sure the `size` is small enough that won't 
trigger exceptions.
    *
-   * @throws OutOfMemoryError
+   * @throws SparkOutOfMemoryError
    * @throws TooLargePageException
    */
   public LongArray allocateArray(long size) {
@@ -154,6 +154,6 @@ public abstract class MemoryConsumer {
       taskMemoryManager.freePage(page, this);
     }
     taskMemoryManager.showMemoryUsage();
-    throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+    throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes 
of memory, got " + got);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java 
b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
new file mode 100644
index 0000000..ca00ca5
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.memory;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * This exception is thrown when a task can not acquire memory from the Memory 
manager.
+ * Instead of throwing {@link OutOfMemoryError}, which kills the executor,
+ * we should use throw this exception, which just kills the current task.
+ */
+@Private
+public final class SparkOutOfMemoryError extends OutOfMemoryError {
+
+    public SparkOutOfMemoryError(String s) {
+        super(s);
+    }
+
+    public SparkOutOfMemoryError(OutOfMemoryError e) {
+        super(e.getMessage());
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index f6b5ea3..e8d3730 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -192,7 +192,7 @@ public class TaskMemoryManager {
             throw new RuntimeException(e.getMessage());
           } catch (IOException e) {
             logger.error("error while calling spill() on " + c, e);
-            throw new OutOfMemoryError("error while calling spill() on " + c + 
" : "
+            throw new SparkOutOfMemoryError("error while calling spill() on " 
+ c + " : "
               + e.getMessage());
           }
         }
@@ -213,7 +213,7 @@ public class TaskMemoryManager {
           throw new RuntimeException(e.getMessage());
         } catch (IOException e) {
           logger.error("error while calling spill() on " + consumer, e);
-          throw new OutOfMemoryError("error while calling spill() on " + 
consumer + " : "
+          throw new SparkOutOfMemoryError("error while calling spill() on " + 
consumer + " : "
             + e.getMessage());
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index e80f973..c3a07b2 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -33,6 +33,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TooLargePageException;
 import org.apache.spark.serializer.DummySerializerInstance;
@@ -337,7 +338,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
         // The pointer array is too big to fix in a single page, spill.
         spill();
         return;
-      } catch (OutOfMemoryError e) {
+      } catch (SparkOutOfMemoryError e) {
         // should have trigger spilling
         if (!inMemSorter.hasSpaceForAnotherRecord()) {
           logger.error("Unable to grow the pointer array");

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8b8e15e..66118f4 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -25,6 +25,7 @@ import java.util.Queue;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -349,7 +350,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
         // The pointer array is too big to fix in a single page, spill.
         spill();
         return;
-      } catch (OutOfMemoryError e) {
+      } catch (SparkOutOfMemoryError e) {
         // should have trigger spilling
         if (!inMemSorter.hasSpaceForAnotherRecord()) {
           logger.error("Unable to grow the pointer array");

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 3bb87a6..951d076 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -24,6 +24,7 @@ import org.apache.avro.reflect.Nullable;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.UnsafeAlignedOffset;
@@ -212,7 +213,7 @@ public final class UnsafeInMemorySorter {
 
   public void expandPointerArray(LongArray newArray) {
     if (newArray.size() < array.size()) {
-      throw new OutOfMemoryError("Not enough memory to grow pointer array");
+      throw new SparkOutOfMemoryError("Not enough memory to grow pointer 
array");
     }
     Platform.copyMemory(
       array.getBaseObject(),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index af0a0ab..2c3a8ef 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -35,7 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, 
TaskDescription}
 import org.apache.spark.shuffle.FetchFailedException
@@ -553,10 +553,9 @@ private[spark] class Executor(
 
           // Don't forcibly exit unless the exception was inherently fatal, to 
avoid
           // stopping other tasks unnecessarily.
-          if (Utils.isFatalError(t)) {
+          if (!t.isInstanceOf[SparkOutOfMemoryError] && Utils.isFatalError(t)) 
{
             uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), 
t)
           }
-
       } finally {
         runningTasks.remove(taskId)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a7494df/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 756eeb6..9dc334c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -205,7 +206,7 @@ class TungstenAggregationIterator(
           buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
           if (buffer == null) {
             // failed to allocate the first page
-            throw new OutOfMemoryError("No enough memory for aggregation")
+            throw new SparkOutOfMemoryError("No enough memory for aggregation")
           }
         }
         processRow(buffer, newInput)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to