Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163749065 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } + val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) + } else { + None + } + + // Make sure that we have enough memory to store the block. By this point, it is possible that + // the block's actual memory usage has exceeded the unroll memory by a small amount, so we + // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = - new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { + val amountToRequest = size - unrollMemoryUsedByThisBlock + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) + if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { - if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = - memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { - transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra - } else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true - } + } + + if (keepUnrolling) { --- End diff -- a little improvement ``` if (keepUnrolling) { val builder = valuesHolder.getBuilder() ... if (keepUnrolling) { val entry = builder.build() ... Right(entry.size) } else { ... logUnrollFailureMessage(blockId, builder.preciseSize) Left(unrollMemoryUsedByThisBlock) } } else { ... logUnrollFailureMessage(blockId, valueHolder.estimatedSize) Left(unrollMemoryUsedByThisBlock) } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org