[ 
https://issues.apache.org/jira/browse/SPARK-35486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-35486.
--------------------------
    Fix Version/s: 3.2.0
       Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/32625

> MemoryConsumer reservations that trigger a partial self-spill can fail even 
> if memory is available
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35486
>                 URL: https://issues.apache.org/jira/browse/SPARK-35486
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.8, 3.0.2, 3.1.1
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>            Priority: Major
>             Fix For: 3.2.0
>
>
> When a memory reservation triggers a self-spill, 
> ExecutionMemoryPool#releaseMemory() will immediately notify waiting tasks 
> that memory has been freed. If there are any waiting tasks with less than 
> 1/2N of the memory pool, they may acquire the newly-freed memory before the 
> current task has a chance to do so. This will cause the original memory 
> reservation to fail. If the initial spill did not release all available 
> memory, the reservation could have been satisfied by asking it to spill again.
> For example, the following test fails when added to MemoryManagerSuite:
> {code:scala}
> test("SPARK-35486: memory freed by self-spilling is taken by another task") {
>     val memoryManager = createMemoryManager(1000L)
>     val t1MemManager = new TaskMemoryManager(memoryManager, 1)
>     val t2MemManager = new TaskMemoryManager(memoryManager, 2)
>     val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager)
>     val c2 = new TestMemoryConsumer(t2MemManager)
>     val futureTimeout: Duration = 20.seconds
>     // t1 acquires 1000 bytes. This should succeed immediately.
>     val t1Result1 = Future { c1.acquireMemory(1000L) }
>     assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
>     assert(c1.getUsed() === 1000L)
>     // t2 attempts to acquire 500 bytes. This should block since there is no 
> memory available.
>     val t2Result1 = Future { c2.acquireMemory(500L) }
>     Thread.sleep(300)
>     assert(!t2Result1.isCompleted)
>     assert(c2.getUsed() === 0L)
>     // t1 attempts to acquire 500 bytes, causing its existing reservation to 
> spill partially. t2 is
>     // first in line for the freed memory.
>     //
>     // SPARK-35486: This currently causes the reservation to fail. Instead, 
> t1 should try again,
>     // causing the rest of the reservation to spill.
>     val t1Result2 = Future { c1.acquireMemory(500L) }
>     // The spill should release enough memory for both t1's and t2's 
> reservations to be satisfied.
>     assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L)
>     // SPARK-35486: This assertion fails: 0L != 500L.
>     // assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 500L)
>   }
> {code}
> {code:java}
> /**
>  * A TestMemoryConsumer which, when asked to spill, releases only enough 
> memory to satisfy the
>  * request rather than releasing all its memory.
>  */
> public class TestPartialSpillingMemoryConsumer extends TestMemoryConsumer {
>   public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager, 
> MemoryMode mode) {
>     super(memoryManager, mode);
>   }
>   public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager) {
>     super(memoryManager);
>   }
>   @Override
>   public long spill(long size, MemoryConsumer trigger) throws IOException {
>     long used = getUsed();
>     long released = Math.min(used, size);
>     free(released);
>     return released;
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to