[ https://issues.apache.org/jira/browse/SPARK-35486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wuyi resolved SPARK-35486. -------------------------- Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/32625 > MemoryConsumer reservations that trigger a partial self-spill can fail even > if memory is available > -------------------------------------------------------------------------------------------------- > > Key: SPARK-35486 > URL: https://issues.apache.org/jira/browse/SPARK-35486 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.8, 3.0.2, 3.1.1 > Reporter: Ankur Dave > Assignee: Ankur Dave > Priority: Major > Fix For: 3.2.0 > > > When a memory reservation triggers a self-spill, > ExecutionMemoryPool#releaseMemory() will immediately notify waiting tasks > that memory has been freed. If there are any waiting tasks with less than > 1/2N of the memory pool, they may acquire the newly-freed memory before the > current task has a chance to do so. This will cause the original memory > reservation to fail. If the initial spill did not release all available > memory, the reservation could have been satisfied by asking it to spill again. > For example, the following test fails when added to MemoryManagerSuite: > {code:scala} > test("SPARK-35486: memory freed by self-spilling is taken by another task") { > val memoryManager = createMemoryManager(1000L) > val t1MemManager = new TaskMemoryManager(memoryManager, 1) > val t2MemManager = new TaskMemoryManager(memoryManager, 2) > val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager) > val c2 = new TestMemoryConsumer(t2MemManager) > val futureTimeout: Duration = 20.seconds > // t1 acquires 1000 bytes. This should succeed immediately. > val t1Result1 = Future { c1.acquireMemory(1000L) } > assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L) > assert(c1.getUsed() === 1000L) > // t2 attempts to acquire 500 bytes. This should block since there is no > memory available. > val t2Result1 = Future { c2.acquireMemory(500L) } > Thread.sleep(300) > assert(!t2Result1.isCompleted) > assert(c2.getUsed() === 0L) > // t1 attempts to acquire 500 bytes, causing its existing reservation to > spill partially. t2 is > // first in line for the freed memory. > // > // SPARK-35486: This currently causes the reservation to fail. Instead, > t1 should try again, > // causing the rest of the reservation to spill. > val t1Result2 = Future { c1.acquireMemory(500L) } > // The spill should release enough memory for both t1's and t2's > reservations to be satisfied. > assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L) > // SPARK-35486: This assertion fails: 0L != 500L. > // assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 500L) > } > {code} > {code:java} > /** > * A TestMemoryConsumer which, when asked to spill, releases only enough > memory to satisfy the > * request rather than releasing all its memory. > */ > public class TestPartialSpillingMemoryConsumer extends TestMemoryConsumer { > public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager, > MemoryMode mode) { > super(memoryManager, mode); > } > public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager) { > super(memoryManager); > } > @Override > public long spill(long size, MemoryConsumer trigger) throws IOException { > long used = getUsed(); > long released = Math.min(used, size); > free(released); > return released; > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org