Repository: spark Updated Branches: refs/heads/master d6b30edd4 -> 1662e9311
[SPARK-21501] Change CacheLoader to limit entries based on memory footprint Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers. We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself. We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB. https://issues.apache.org/jira/browse/SPARK-21501 Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed. Author: Sanket Chintapalli <schin...@yahoo-inc.com> Closes #18940 from redsanket/SPARK-21501. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1662e931 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1662e931 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1662e931 Branch: refs/heads/master Commit: 1662e93119d68498942386906de309d35f4a135f Parents: d6b30ed Author: Sanket Chintapalli <schin...@yahoo-inc.com> Authored: Wed Aug 23 11:51:11 2017 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Wed Aug 23 11:51:11 2017 -0500 ---------------------------------------------------------------------- .../org/apache/spark/network/util/TransportConf.java | 4 ++++ .../network/shuffle/ExternalShuffleBlockResolver.java | 11 +++++++++-- .../spark/network/shuffle/ShuffleIndexInformation.java | 11 ++++++++++- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++- docs/configuration.md | 6 +++--- 5 files changed, 29 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 88256b8..fa2ff42 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -67,6 +67,10 @@ public class TransportConf { return conf.getInt(name, defaultValue); } + public String get(String name, String defaultValue) { + return conf.get(name, defaultValue); + } + private String getConfKey(String suffix) { return "spark." + module + "." + suffix; } http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index d7ec0e2..e639989 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -33,6 +33,7 @@ import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; import com.google.common.collect.Maps; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -104,7 +105,7 @@ public class ExternalShuffleBlockResolver { Executor directoryCleaner) throws IOException { this.conf = conf; this.registeredExecutorFile = registeredExecutorFile; - int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024); + String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = new CacheLoader<File, ShuffleIndexInformation>() { public ShuffleIndexInformation load(File file) throws IOException { @@ -112,7 +113,13 @@ public class ExternalShuffleBlockResolver { } }; shuffleIndexCache = CacheBuilder.newBuilder() - .maximumSize(indexCacheEntries).build(indexCacheLoader); + .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) + .weigher(new Weigher<File, ShuffleIndexInformation>() { + public int weigh(File file, ShuffleIndexInformation indexInfo) { + return indexInfo.getSize(); + } + }) + .build(indexCacheLoader); db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); if (db != null) { executors = reloadRegisteredExecutors(db); http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 39ca9ba..386738e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -31,9 +31,10 @@ import java.nio.file.Files; public class ShuffleIndexInformation { /** offsets as long buffer */ private final LongBuffer offsets; + private int size; public ShuffleIndexInformation(File indexFile) throws IOException { - int size = (int)indexFile.length(); + size = (int)indexFile.length(); ByteBuffer buffer = ByteBuffer.allocate(size); offsets = buffer.asLongBuffer(); DataInputStream dis = null; @@ -48,6 +49,14 @@ public class ShuffleIndexInformation { } /** + * Size of the index file + * @return size + */ + public int getSize() { + return size; + } + + /** * Get index offset for a particular reducer. */ public ShuffleIndexRecord getIndex(int reduceId) { http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 715cfdc..e61f943 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0", "Please use the new blacklisting options, spark.blacklist.*"), DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"), - DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more") + DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"), + DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", + "Not used any more. Please use spark.shuffle.service.index.cache.size") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index e7c0306..6e9fe59 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.shuffle.service.index.cache.entries</code></td> - <td>1024</td> + <td><code>spark.shuffle.service.index.cache.size</code></td> + <td>100m</td> <td> - Max number of entries to keep in the index cache of the shuffle service. + Cache entries limited to the specified memory footprint. </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org