Repository: spark Updated Branches: refs/heads/master b14993e1f -> 6a8abe29e
[SPARK-23508][CORE] Fix BlockmanagerId in case blockManagerIdCache cause oom ⦠cause oom ## What changes were proposed in this pull request? blockManagerIdCache in BlockManagerId will not remove old values which may cause oom `val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()` Since whenever we apply a new BlockManagerId, it will put into this map. This patch will use guava cahce for blockManagerIdCache instead. A heap dump show in [SPARK-23508](https://issues.apache.org/jira/browse/SPARK-23508) ## How was this patch tested? Exist tests. Author: zhoukang <zhoukang199...@gmail.com> Closes #20667 from caneGuy/zhoukang/fix-history. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a8abe29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a8abe29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a8abe29 Branch: refs/heads/master Commit: 6a8abe29ef3369b387d9bc2ee3459a6611246ab1 Parents: b14993e Author: zhoukang <zhoukang199...@gmail.com> Authored: Wed Feb 28 23:16:29 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 28 23:16:29 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/storage/BlockManagerId.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6a8abe29/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 2c3da0e..d4a59c3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,7 +18,8 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import java.util.concurrent.ConcurrentHashMap + +import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi @@ -132,10 +133,17 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + /** + * The max cache size is hardcoded to 10000, since the size of a BlockManagerId + * object is about 48B, the total memory cost should be below 1MB which is feasible. + */ + val blockManagerIdCache = CacheBuilder.newBuilder() + .maximumSize(10000) + .build(new CacheLoader[BlockManagerId, BlockManagerId]() { + override def load(id: BlockManagerId) = id + }) def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { - blockManagerIdCache.putIfAbsent(id, id) blockManagerIdCache.get(id) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org