[ 
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694677#comment-15694677
 ] 

Roy Wang commented on SPARK-14560:
----------------------------------

UnsafeExternalSorter used a lot of memory but it looked like that 
TaskMemoryManager couldn't spill these consumers when acquireExecutionMemory 
method was called.

public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
    assert(required >= 0);
    assert(consumer != null);
    MemoryMode mode = consumer.getMode();
    // If we are allocating Tungsten pages off-heap and receive a request to 
allocate on-heap
    // memory here, then it may not make sense to spill since that would only 
end up freeing
    // off-heap memory. This is subject to change, though, so it may be risky 
to make this
    // optimization now in case we forget to undo it late when making changes.
    synchronized (this) {
      long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, 
mode);

      // Try to release memory from other consumers first, then we can reduce 
the frequency of
      // spilling, avoid to have too many spilled files.
      if (got < required) {
        // Call spill() on other consumers to release memory
        for (MemoryConsumer c: consumers) {
          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
            try {
              long released = c.spill(required - got, consumer);
              if (released > 0) {
                logger.info("Task {} released {} from {} for {} by {} mode", 
taskAttemptId,
                  Utils.bytesToString(released), c, consumer, mode.name());
                got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
                if (got >= required) {
                  break;
                }
              }
            } catch (IOException e) {
              logger.error("error while calling spill() on " + c, e);
              throw new OutOfMemoryError("error while calling spill() on " + c 
+ " : "
                + e.getMessage());
            }
          }
        }
      }

      // call spill() on itself
      if (got < required) {
        try {
          long released = consumer.spill(required - got, consumer);
          if (released > 0) {
            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
              Utils.bytesToString(released), consumer);
            got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
          }
        } catch (IOException e) {
          logger.error("error while calling spill() on " + consumer, e);
          throw new OutOfMemoryError("error while calling spill() on " + 
consumer + " : "
            + e.getMessage());
        }
      }

      consumers.add(consumer);
      logger.debug("Task {} acquired {} for {}", taskAttemptId, 
Utils.bytesToString(got), consumer);
      return got;
    }
  }

Here is the debug log when OOM happened:

16/11/24 06:20:26 INFO TaskMemoryManager: 0 bytes of memory were used by task 
72969 but are not associated with specific consumers
16/11/24 06:20:26 INFO TaskMemoryManager: 18911656686 bytes of memory are used 
for execution and 1326241517 bytes of memory are used for storage
16/11/24 06:20:26 INFO TaskMemoryManager: Memory used in task 72969
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@45f5aaf1: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@25bc0eb9: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7ca441ad: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7c4b6a45: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@152d313e: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@f936c87: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@578f36d: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4f4ab652: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6947ad58: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@38eb70df: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6c4c0f79: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1d2f3c19: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5d010e6: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@19a2279b: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7536c80b: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@53b3318d: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@509328c4: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4b175ef: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@69d667b7: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@313de45f: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@468cefe9: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5d8b57ee: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2f594f30: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@19e5588b: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@157c760c: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@237d740c: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@73f20a80: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4be26b90: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@30394641: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@36704c08: 
64.0 KB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@762c8552: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1dcbd5c: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@ab68433: 64.0 
KB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@62387ebe: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5ab02a36: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@3a22beaa: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@69957a03: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@258f5017: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1317b3b6: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6a5ae3fe: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7d6129c4: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@162f3089: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@41e5a64c: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2a17b9ba: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@504445eb: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@346874db: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@10f66341: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@41aae712: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@288eac32: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@638fb145: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2941868f: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5cf3f921: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7dfc722f: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@694c9d96: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@80d13d6: 64.0 
MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@16de0e05: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@203e95e0: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@52944472: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b3774d0: 
50.9 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7fb08b5e: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@56ab8458: 
644.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@24e30316: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@18a4a85d: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@523cbdc0: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5c611d9c: 
64.0 MB
16/11/24 06:20:26 INFO TaskMemoryManager: 0 bytes of memory were used by task 
72969 but are not associated with specific consumers
16/11/24 06:20:26 INFO TaskMemoryManager: 18911656686 bytes of memory are used 
for execution and 1326241517 bytes of memory are used for storage
16/11/24 06:20:26 ERROR SparkHiveDynamicPartitionWriterContainer: Aborting task.
java.lang.OutOfMemoryError: Unable to acquire 276 bytes of memory, got 0

> Cooperative Memory Management for Spillables
> --------------------------------------------
>
>                 Key: SPARK-14560
>                 URL: https://issues.apache.org/jira/browse/SPARK-14560
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.1
>            Reporter: Imran Rashid
>            Assignee: Lianhui Wang
>             Fix For: 2.0.0
>
>
> SPARK-10432 introduced cooperative memory management for SQL operators that 
> can spill; however, {{Spillable}} s used by the old RDD api still do not 
> cooperate.  This can lead to memory starvation, in particular on a 
> shuffle-to-shuffle stage, eventually resulting in errors like:
> {noformat}
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory 
> were used by task 3081 but are not associated with specific consumers
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory 
> are used for execution and 1710484 bytes of memory are used for storage
> 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size 
> = 1317230346 bytes, TID = 3081
> 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage 
> 3.0 (TID 3081)
> java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0
>         at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>         at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
>         at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
>         at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>         at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This can happen anytime the shuffle read side requires more memory than what 
> is available for the task.  Since the shuffle-read side doubles its memory 
> request each time, it can easily end up acquiring all of the available 
> memory, even if it does not use it.  Eg., say that after the final spill, the 
> shuffle-read side requires 10 MB more memory, and there is 15 MB of memory 
> available.  But if it starts at 2 MB, it will double to 4, 8, and then 
> request 16 MB of memory, and in fact get all available 15 MB.  Since the 15 
> MB of memory is sufficient, it will not spill, and will continue holding on 
> to all available memory.  But this leaves *no* memory available for the 
> shuffle-write side.  Since the shuffle-write side cannot request the 
> shuffle-read side to free up memory, this leads to an OOM.
> The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as 
> well, so RDDs can benefit from the cooperative memory management introduced 
> by SPARK-10342.
> Note that an additional improvement would be for the shuffle-read side to 
> simple release unused memory, without spilling, in case that would leave 
> enough memory, and only spill if that was inadequate.  However that can come 
> as a later improvement.
> *Workaround*:  You can set 
> {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to 
> occur every {{N}} elements, thus preventing the shuffle-read side from ever 
> grabbing all of the available memory.  However, this requires careful tuning 
> of {{N}} to specific workloads: too big, and you will still get an OOM; too 
> small, and there will be so much spilling that performance will suffer 
> drastically.  Furthermore, this workaround uses an *undocumented* 
> configuration with *no compatibility guarantees* for future versions of spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to