Repository: spark Updated Branches: refs/heads/branch-1.4 205ed15f2 -> 7cea552e1
[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC. This pull request is a minimum-viable-implementation of this idea. In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM. Author: Josh Rosen <joshro...@databricks.com> Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits: fd6cb55 [Josh Rosen] SoftReference -> WeakReference b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager (cherry picked from commit 7956dd7ab03e1542d89dd94c043f1e5131684199) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7cea552e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7cea552e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7cea552e Branch: refs/heads/branch-1.4 Commit: 7cea552e1edf1d4e0143349692ff751ca493a912 Parents: 205ed15 Author: Josh Rosen <joshro...@databricks.com> Authored: Wed May 20 16:37:11 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Wed May 20 16:39:36 2015 -0700 ---------------------------------------------------------------------- .../unsafe/memory/ExecutorMemoryManager.java | 57 +++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7cea552e/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java ---------------------------------------------------------------------- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java index 62c29c8..cbbe859 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java @@ -17,6 +17,12 @@ package org.apache.spark.unsafe.memory; +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; + /** * Manages memory for an executor. Individual operators / tasks allocate memory through * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager. @@ -33,6 +39,12 @@ public class ExecutorMemoryManager { */ final boolean inHeap; + @GuardedBy("this") + private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = + new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>(); + + private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; + /** * Construct a new ExecutorMemoryManager. * @@ -44,15 +56,56 @@ public class ExecutorMemoryManager { } /** + * Returns true if allocations of the given size should go through the pooling mechanism and + * false otherwise. + */ + private boolean shouldPool(long size) { + // Very small allocations are less likely to benefit from pooling. + // At some point, we should explore supporting pooling for off-heap memory, but for now we'll + // ignore that case in the interest of simplicity. + return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator; + } + + /** * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed * to be zeroed out (call `zero()` on the result if this is necessary). */ MemoryBlock allocate(long size) throws OutOfMemoryError { - return allocator.allocate(size); + if (shouldPool(size)) { + synchronized (this) { + final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool != null) { + while (!pool.isEmpty()) { + final WeakReference<MemoryBlock> blockReference = pool.pop(); + final MemoryBlock memory = blockReference.get(); + if (memory != null) { + assert (memory.size() == size); + return memory; + } + } + bufferPoolsBySize.remove(size); + } + } + return allocator.allocate(size); + } else { + return allocator.allocate(size); + } } void free(MemoryBlock memory) { - allocator.free(memory); + final long size = memory.size(); + if (shouldPool(size)) { + synchronized (this) { + LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool == null) { + pool = new LinkedList<WeakReference<MemoryBlock>>(); + bufferPoolsBySize.put(size, pool); + } + pool.add(new WeakReference<MemoryBlock>(memory)); + } + } else { + allocator.free(memory); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org