[ https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694677#comment-15694677 ]
Roy Wang commented on SPARK-14560: ---------------------------------- UnsafeExternalSorter used a lot of memory but it looked like that TaskMemoryManager couldn't spill these consumers when acquireExecutionMemory method was called. public long acquireExecutionMemory(long required, MemoryConsumer consumer) { assert(required >= 0); assert(consumer != null); MemoryMode mode = consumer.getMode(); // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap // memory here, then it may not make sense to spill since that would only end up freeing // off-heap memory. This is subject to change, though, so it may be risky to make this // optimization now in case we forget to undo it late when making changes. synchronized (this) { long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode); // Try to release memory from other consumers first, then we can reduce the frequency of // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { try { long released = c.spill(required - got, consumer); if (released > 0) { logger.info("Task {} released {} from {} for {} by {} mode", taskAttemptId, Utils.bytesToString(released), c, consumer, mode.name()); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); if (got >= required) { break; } } } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " + e.getMessage()); } } } } // call spill() on itself if (got < required) { try { long released = consumer.spill(required - got, consumer); if (released > 0) { logger.debug("Task {} released {} from itself ({})", taskAttemptId, Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " + e.getMessage()); } } consumers.add(consumer); logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer); return got; } } Here is the debug log when OOM happened: 16/11/24 06:20:26 INFO TaskMemoryManager: 0 bytes of memory were used by task 72969 but are not associated with specific consumers 16/11/24 06:20:26 INFO TaskMemoryManager: 18911656686 bytes of memory are used for execution and 1326241517 bytes of memory are used for storage 16/11/24 06:20:26 INFO TaskMemoryManager: Memory used in task 72969 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@45f5aaf1: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@25bc0eb9: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7ca441ad: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7c4b6a45: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@152d313e: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@f936c87: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@578f36d: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4f4ab652: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6947ad58: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@38eb70df: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6c4c0f79: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1d2f3c19: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5d010e6: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@19a2279b: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7536c80b: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@53b3318d: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@509328c4: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4b175ef: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@69d667b7: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@313de45f: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@468cefe9: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5d8b57ee: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2f594f30: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@19e5588b: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@157c760c: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@237d740c: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@73f20a80: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4be26b90: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@30394641: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@36704c08: 64.0 KB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@762c8552: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1dcbd5c: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@ab68433: 64.0 KB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@62387ebe: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5ab02a36: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@3a22beaa: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@69957a03: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@258f5017: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1317b3b6: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6a5ae3fe: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7d6129c4: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@162f3089: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@41e5a64c: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2a17b9ba: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@504445eb: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@346874db: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@10f66341: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@41aae712: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@288eac32: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@638fb145: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2941868f: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5cf3f921: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7dfc722f: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@694c9d96: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@80d13d6: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@16de0e05: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@203e95e0: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@52944472: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2b3774d0: 50.9 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7fb08b5e: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@56ab8458: 644.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@24e30316: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@18a4a85d: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@523cbdc0: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5c611d9c: 64.0 MB 16/11/24 06:20:26 INFO TaskMemoryManager: 0 bytes of memory were used by task 72969 but are not associated with specific consumers 16/11/24 06:20:26 INFO TaskMemoryManager: 18911656686 bytes of memory are used for execution and 1326241517 bytes of memory are used for storage 16/11/24 06:20:26 ERROR SparkHiveDynamicPartitionWriterContainer: Aborting task. java.lang.OutOfMemoryError: Unable to acquire 276 bytes of memory, got 0 > Cooperative Memory Management for Spillables > -------------------------------------------- > > Key: SPARK-14560 > URL: https://issues.apache.org/jira/browse/SPARK-14560 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.1 > Reporter: Imran Rashid > Assignee: Lianhui Wang > Fix For: 2.0.0 > > > SPARK-10432 introduced cooperative memory management for SQL operators that > can spill; however, {{Spillable}} s used by the old RDD api still do not > cooperate. This can lead to memory starvation, in particular on a > shuffle-to-shuffle stage, eventually resulting in errors like: > {noformat} > 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081 > 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by > org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB > 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory > were used by task 3081 but are not associated with specific consumers > 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory > are used for execution and 1710484 bytes of memory are used for storage > 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size > = 1317230346 bytes, TID = 3081 > 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage > 3.0 (TID 3081) > java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) > at > org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346) > at > org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This can happen anytime the shuffle read side requires more memory than what > is available for the task. Since the shuffle-read side doubles its memory > request each time, it can easily end up acquiring all of the available > memory, even if it does not use it. Eg., say that after the final spill, the > shuffle-read side requires 10 MB more memory, and there is 15 MB of memory > available. But if it starts at 2 MB, it will double to 4, 8, and then > request 16 MB of memory, and in fact get all available 15 MB. Since the 15 > MB of memory is sufficient, it will not spill, and will continue holding on > to all available memory. But this leaves *no* memory available for the > shuffle-write side. Since the shuffle-write side cannot request the > shuffle-read side to free up memory, this leads to an OOM. > The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as > well, so RDDs can benefit from the cooperative memory management introduced > by SPARK-10342. > Note that an additional improvement would be for the shuffle-read side to > simple release unused memory, without spilling, in case that would leave > enough memory, and only spill if that was inadequate. However that can come > as a later improvement. > *Workaround*: You can set > {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to > occur every {{N}} elements, thus preventing the shuffle-read side from ever > grabbing all of the available memory. However, this requires careful tuning > of {{N}} to specific workloads: too big, and you will still get an OOM; too > small, and there will be so much spilling that performance will suffer > drastically. Furthermore, this workaround uses an *undocumented* > configuration with *no compatibility guarantees* for future versions of spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org