mridulm commented on a change in pull request #32625: URL: https://github.com/apache/spark/pull/32625#discussion_r638397154
########## File path: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ########## @@ -240,6 +240,37 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L) } + test("SPARK-35486: memory freed by self-spilling is taken by another task") { + val memoryManager = createMemoryManager(1000L) + val t1MemManager = new TaskMemoryManager(memoryManager, 1) + val t2MemManager = new TaskMemoryManager(memoryManager, 2) + val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager) + val c2 = new TestMemoryConsumer(t2MemManager) + val futureTimeout: Duration = 20.seconds + + // t1 acquires 1000 bytes. This should succeed immediately. + val t1Result1 = Future { c1.acquireMemory(1000L) } + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L) + assert(c1.getUsed() === 1000L) + + // t2 attempts to acquire 500 bytes. This should block since there is no memory available. + val t2Result1 = Future { c2.acquireMemory(500L) } + Thread.sleep(300) + assert(!t2Result1.isCompleted) + assert(c2.getUsed() === 0L) + + // t1 attempts to acquire 500 bytes, causing its existing reservation to spill partially. t2 is + // first in line for the freed memory, so t1 must try again, causing the rest of the reservation + // to spill. Review comment: Thanks for the details response @ankurdave ! I missed out on (3) + maxMemoryPerTask - makes sense ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org