Repository: spark Updated Branches: refs/heads/master 99e32f8ba -> e1960c3d6
[SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM ## What changes were proposed in this pull request? In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor. So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file. To achieve this feature, what I did is: 1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle. 2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all. ## How was this patch tested? This was tested by adding UT, also manual verification in local test to perform GC to clean the files. Author: jerryshao <ss...@hortonworks.com> Closes #19476 from jerryshao/SPARK-22062. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1960c3d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1960c3d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1960c3d Branch: refs/heads/master Commit: e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698 Parents: 99e32f8 Author: jerryshao <ss...@hortonworks.com> Authored: Tue Oct 17 22:54:38 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Oct 17 22:54:38 2017 +0800 ---------------------------------------------------------------------- .../network/shuffle/ExternalShuffleClient.java | 4 +- .../network/shuffle/OneForOneBlockFetcher.java | 12 +-- .../spark/network/shuffle/ShuffleClient.java | 10 +- .../spark/network/shuffle/TempFileManager.java | 36 +++++++ .../network/shuffle/TempShuffleFileManager.java | 36 ------- .../main/scala/org/apache/spark/SparkConf.scala | 4 +- .../apache/spark/internal/config/package.scala | 15 +-- .../spark/network/BlockTransferService.scala | 28 +++-- .../netty/NettyBlockTransferService.scala | 6 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 102 +++++++++++++++++-- .../spark/storage/BlockManagerMaster.scala | 6 ++ .../storage/BlockManagerMasterEndpoint.scala | 14 +++ .../spark/storage/BlockManagerMessages.scala | 7 ++ .../storage/ShuffleBlockFetcherIterator.scala | 8 +- .../org/apache/spark/DistributedSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 57 ++++++++--- .../ShuffleBlockFetcherIteratorSuite.scala | 10 +- docs/configuration.md | 11 +- 19 files changed, 266 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 7770244..510017f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -91,7 +91,7 @@ public class ExternalShuffleClient extends ShuffleClient { String execId, String[] blockIds, BlockFetchingListener listener, - TempShuffleFileManager tempShuffleFileManager) { + TempFileManager tempFileManager) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { @@ -99,7 +99,7 @@ public class ExternalShuffleClient extends ShuffleClient { (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, tempShuffleFileManager).start(); + blockIds1, listener1, conf, tempFileManager).start(); }; int maxRetries = conf.maxIORetries(); http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 66b67e2..3f2f20b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -58,7 +58,7 @@ public class OneForOneBlockFetcher { private final BlockFetchingListener listener; private final ChunkReceivedCallback chunkCallback; private final TransportConf transportConf; - private final TempShuffleFileManager tempShuffleFileManager; + private final TempFileManager tempFileManager; private StreamHandle streamHandle = null; @@ -79,14 +79,14 @@ public class OneForOneBlockFetcher { String[] blockIds, BlockFetchingListener listener, TransportConf transportConf, - TempShuffleFileManager tempShuffleFileManager) { + TempFileManager tempFileManager) { this.client = client; this.openMessage = new OpenBlocks(appId, execId, blockIds); this.blockIds = blockIds; this.listener = listener; this.chunkCallback = new ChunkCallback(); this.transportConf = transportConf; - this.tempShuffleFileManager = tempShuffleFileManager; + this.tempFileManager = tempFileManager; } /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */ @@ -125,7 +125,7 @@ public class OneForOneBlockFetcher { // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { - if (tempShuffleFileManager != null) { + if (tempFileManager != null) { client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i), new DownloadCallback(i)); } else { @@ -164,7 +164,7 @@ public class OneForOneBlockFetcher { private int chunkIndex; DownloadCallback(int chunkIndex) throws IOException { - this.targetFile = tempShuffleFileManager.createTempShuffleFile(); + this.targetFile = tempFileManager.createTempFile(); this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); this.chunkIndex = chunkIndex; } @@ -180,7 +180,7 @@ public class OneForOneBlockFetcher { ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0, targetFile.length()); listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); - if (!tempShuffleFileManager.registerTempShuffleFileToClean(targetFile)) { + if (!tempFileManager.registerTempFileToClean(targetFile)) { targetFile.delete(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java index 5bd4412..18b04fe 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java @@ -43,10 +43,10 @@ public abstract class ShuffleClient implements Closeable { * @param execId the executor id. * @param blockIds block ids to fetch. * @param listener the listener to receive block fetching status. - * @param tempShuffleFileManager TempShuffleFileManager to create and clean temp shuffle files. - * If it's not <code>null</code>, the remote blocks will be streamed - * into temp shuffle files to reduce the memory usage, otherwise, - * they will be kept in memory. + * @param tempFileManager TempFileManager to create and clean temp files. + * If it's not <code>null</code>, the remote blocks will be streamed + * into temp shuffle files to reduce the memory usage, otherwise, + * they will be kept in memory. */ public abstract void fetchBlocks( String host, @@ -54,7 +54,7 @@ public abstract class ShuffleClient implements Closeable { String execId, String[] blockIds, BlockFetchingListener listener, - TempShuffleFileManager tempShuffleFileManager); + TempFileManager tempFileManager); /** * Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java new file mode 100644 index 0000000..552364d --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.File; + +/** + * A manager to create temp block files to reduce the memory usage and also clean temp + * files when they won't be used any more. + */ +public interface TempFileManager { + + /** Create a temp block file. */ + File createTempFile(); + + /** + * Register a temp file to clean up when it won't be used any more. Return whether the + * file is registered successfully. If `false`, the caller should clean up the file by itself. + */ + boolean registerTempFileToClean(File file); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java deleted file mode 100644 index 84a5ed6..0000000 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffle; - -import java.io.File; - -/** - * A manager to create temp shuffle block files to reduce the memory usage and also clean temp - * files when they won't be used any more. - */ -public interface TempShuffleFileManager { - - /** Create a temp shuffle block file. */ - File createTempShuffleFile(); - - /** - * Register a temp shuffle file to clean up when it won't be used any more. Return whether the - * file is registered successfully. If `false`, the caller should clean up the file by itself. - */ - boolean registerTempShuffleFileToClean(File file); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e61f943..57b3744 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -662,7 +662,9 @@ private[spark] object SparkConf extends Logging { "spark.yarn.jars" -> Seq( AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( - AlternateConfig("spark.yarn.access.namenodes", "2.2")) + AlternateConfig("spark.yarn.access.namenodes", "2.2")), + "spark.maxRemoteBlockSizeFetchToMem" -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) ) /** http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index efffdca..e7b406a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -357,13 +357,15 @@ package object config { .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.") .createWithDefault(Int.MaxValue) - private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM = - ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem") - .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " + + private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM = + ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem") + .doc("Remote block will be fetched to disk when size of the block is " + "above this threshold. This is to avoid a giant request takes too much memory. We can " + - "enable this config by setting a specific value(e.g. 200m). Note that this config can " + - "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" + - " service is disabled.") + "enable this config by setting a specific value(e.g. 200m). Note this configuration will " + + "affect both shuffle fetch and block manager remote block fetch. For users who " + + "enabled external shuffle service, this feature can only be worked when external shuffle" + + " service is newer than Spark 2.2.") + .withAlternative("spark.reducer.maxReqSizeShuffleToMem") .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) @@ -432,5 +434,4 @@ package object config { .stringConf .toSequence .createOptional - } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index fe5fd2d..1d8a266 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -25,8 +25,8 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import org.apache.spark.internal.Logging -import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager} +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.ThreadUtils @@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempShuffleFileManager: TempShuffleFileManager): Unit + tempFileManager: TempFileManager): Unit /** * Upload a single block to a remote node, available only after [[init]] is invoked. @@ -87,7 +87,12 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo * * It is also only available after [[init]] is invoked. */ - def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { + def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: TempFileManager): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), @@ -96,12 +101,17 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - val ret = ByteBuffer.allocate(data.size.toInt) - ret.put(data.nioByteBuffer()) - ret.flip() - result.success(new NioManagedBuffer(ret)) + data match { + case f: FileSegmentManagedBuffer => + result.success(f) + case _ => + val ret = ByteBuffer.allocate(data.size.toInt) + ret.put(data.nioByteBuffer()) + ret.flip() + result.success(new NioManagedBuffer(ret)) + } } - }, tempShuffleFileManager = null) + }, tempFileManager) ThreadUtils.awaitResult(result.future, Duration.Inf) } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 6a29e18..b7d8c35 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -32,7 +32,7 @@ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ -import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempShuffleFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager} import org.apache.spark.network.shuffle.protocol.UploadBlock import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer @@ -105,14 +105,14 @@ private[spark] class NettyBlockTransferService( execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempShuffleFileManager: TempShuffleFileManager): Unit = { + tempFileManager: TempFileManager): Unit = { logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds, listener, - transportConf, tempShuffleFileManager).start() + transportConf, tempFileManager).start() } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index c8d1460..0562d45 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -52,7 +52,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), - SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM), + SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true)) val serializerInstance = dep.serializer.newInstance() http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a98083d..e0276a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,8 +18,11 @@ package org.apache.spark.storage import java.io._ +import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} import java.nio.ByteBuffer import java.nio.channels.Channels +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.mutable.HashMap @@ -39,7 +42,7 @@ import org.apache.spark.metrics.source.Source import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.ExternalShuffleClient +import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv import org.apache.spark.serializer.{SerializerInstance, SerializerManager} @@ -203,6 +206,13 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + // A TempFileManager used to track all the files of remote blocks which above the + // specified memory threshold. Files will be deleted automatically based on weak reference. + // Exposed for test + private[storage] val remoteBlockTempFileManager = + new BlockManager.RemoteBlockTempFileManager(this) + private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -632,8 +642,8 @@ private[spark] class BlockManager( * Return a list of locations for the given block, prioritizing the local machine since * multiple block managers can share the same host, followed by hosts on the same rack. */ - private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { - val locs = Random.shuffle(master.getLocations(blockId)) + private def sortLocations(locations: Seq[BlockManagerId]): Seq[BlockManagerId] = { + val locs = Random.shuffle(locations) val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } blockManagerId.topologyInfo match { case None => preferredLocs ++ otherLocs @@ -653,7 +663,25 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") var runningFailureCount = 0 var totalFailureCount = 0 - val locations = getLocations(blockId) + + // Because all the remote blocks are registered in driver, it is not necessary to ask + // all the slave executors to get block status. + val locationsAndStatus = master.getLocationsAndStatus(blockId) + val blockSize = locationsAndStatus.map { b => + b.status.diskSize.max(b.status.memSize) + }.getOrElse(0L) + val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty) + + // If the block size is above the threshold, we should pass our FileManger to + // BlockTransferService, which will leverage it to spill the block; if not, then passed-in + // null value means the block will be persisted in memory. + val tempFileManager = if (blockSize > maxRemoteBlockToMem) { + remoteBlockTempFileManager + } else { + null + } + + val locations = sortLocations(blockLocations) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { @@ -661,7 +689,7 @@ private[spark] class BlockManager( logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( - loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer() } catch { case NonFatal(e) => runningFailureCount += 1 @@ -684,7 +712,7 @@ private[spark] class BlockManager( // take a significant amount of time. To get rid of these stale entries // we refresh the block locations after a certain number of fetch failures if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { - locationIterator = getLocations(blockId).iterator + locationIterator = sortLocations(master.getLocations(blockId)).iterator logDebug(s"Refreshed locations from the driver " + s"after ${runningFailureCount} fetch failures.") runningFailureCount = 0 @@ -1512,6 +1540,7 @@ private[spark] class BlockManager( // Closing should be idempotent, but maybe not for the NioBlockTransferService. shuffleClient.close() } + remoteBlockTempFileManager.stop() diskBlockManager.stop() rpcEnv.stop(slaveEndpoint) blockInfoManager.clear() @@ -1552,4 +1581,65 @@ private[spark] object BlockManager { override val metricRegistry = new MetricRegistry metricRegistry.registerAll(metricSet) } + + class RemoteBlockTempFileManager(blockManager: BlockManager) + extends TempFileManager with Logging { + + private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File]) + extends WeakReference[File](file, referenceQueue) { + private val filePath = file.getAbsolutePath + + def cleanUp(): Unit = { + logDebug(s"Clean up file $filePath") + + if (!new File(filePath).delete()) { + logDebug(s"Fail to delete file $filePath") + } + } + } + + private val referenceQueue = new JReferenceQueue[File] + private val referenceBuffer = Collections.newSetFromMap[ReferenceWithCleanup]( + new ConcurrentHashMap) + + private val POLL_TIMEOUT = 1000 + @volatile private var stopped = false + + private val cleaningThread = new Thread() { override def run() { keepCleaning() } } + cleaningThread.setDaemon(true) + cleaningThread.setName("RemoteBlock-temp-file-clean-thread") + cleaningThread.start() + + override def createTempFile(): File = { + blockManager.diskBlockManager.createTempLocalBlock()._2 + } + + override def registerTempFileToClean(file: File): Boolean = { + referenceBuffer.add(new ReferenceWithCleanup(file, referenceQueue)) + } + + def stop(): Unit = { + stopped = true + cleaningThread.interrupt() + cleaningThread.join() + } + + private def keepCleaning(): Unit = { + while (!stopped) { + try { + Option(referenceQueue.remove(POLL_TIMEOUT)) + .map(_.asInstanceOf[ReferenceWithCleanup]) + .foreach { ref => + referenceBuffer.remove(ref) + ref.cleanUp() + } + } catch { + case _: InterruptedException => + // no-op + case NonFatal(e) => + logError("Error in cleaning thread", e) + } + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 8b1dc0b..d24421b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -84,6 +84,12 @@ class BlockManagerMaster( driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId)) } + /** Get locations as well as status of the blockId from the driver */ + def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + driverEndpoint.askSync[Option[BlockLocationsAndStatus]]( + GetLocationsAndStatus(blockId)) + } + /** Get locations of multiple blockIds from the driver */ def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index df0a5f5..56d0266 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -82,6 +82,9 @@ class BlockManagerMasterEndpoint( case GetLocations(blockId) => context.reply(getLocations(blockId)) + case GetLocationsAndStatus(blockId) => + context.reply(getLocationsAndStatus(blockId)) + case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) @@ -422,6 +425,17 @@ class BlockManagerMasterEndpoint( if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty } + private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) + val status = locations.headOption.flatMap { bmId => blockManagerInfo(bmId).getStatus(blockId) } + + if (locations.nonEmpty && status.isDefined) { + Some(BlockLocationsAndStatus(locations, status.get)) + } else { + None + } + } + private def getLocationsMultipleBlockIds( blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { blockIds.map(blockId => getLocations(blockId)) http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 0c0ff14..1bbe7a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -93,6 +93,13 @@ private[spark] object BlockManagerMessages { case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster + case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster + + // The response message of `GetLocationsAndStatus` request. + case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) { + assert(locations.nonEmpty) + } + case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 2d176b6..98b5a73 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -28,7 +28,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -69,7 +69,7 @@ final class ShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean) - extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with Logging { + extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging { import ShuffleBlockFetcherIterator._ @@ -162,11 +162,11 @@ final class ShuffleBlockFetcherIterator( currentResult = null } - override def createTempShuffleFile(): File = { + override def createTempFile(): File = { blockManager.diskBlockManager.createTempLocalBlock()._2 } - override def registerTempShuffleFileToClean(file: File): Boolean = synchronized { + override def registerTempFileToClean(file: File): Boolean = synchronized { if (isZombie) { false } else { http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index bea67b7..f800561 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -171,7 +171,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val serializerManager = SparkEnv.get.serializerManager blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, - blockId.toString) + blockId.toString, null) val deserialized = serializerManager.dataDeserializeStream(blockId, new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList assert(deserialized === (1 to 100).toList) http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cfe89fd..d45c194 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.storage -import java.io.File import java.nio.ByteBuffer import scala.collection.JavaConverters._ @@ -45,14 +44,14 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat +import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -512,8 +511,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) val blockManager = makeBlockManager(128, "exec", bmMaster) - val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) - val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + val sortLocations = PrivateMethod[Seq[BlockManagerId]]('sortLocations) + val locations = blockManager invokePrivate sortLocations(bmMaster.getLocations("test")) assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) } @@ -535,8 +534,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockManager = makeBlockManager(128, "exec", bmMaster) blockManager.blockManagerId = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack)) - val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) - val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + val sortLocations = PrivateMethod[Seq[BlockManagerId]]('sortLocations) + val locations = blockManager invokePrivate sortLocations(bmMaster.getLocations("test")) assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost)) assert(locations.flatMap(_.topologyInfo) === Seq(localRack, localRack, localRack, otherRack, otherRack)) @@ -1274,13 +1273,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // so that we have a chance to do location refresh val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } - when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( + Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty))) + when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn( + blockManagerIds) + store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, transferService = Option(mockBlockTransferService)) val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] assert(block.isDefined) - verify(mockBlockManagerMaster, times(2)).getLocations("item") + verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item") + verify(mockBlockManagerMaster, times(1)).getLocations("item") } test("SPARK-17484: block status is properly updated following an exception in put()") { @@ -1371,8 +1375,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE server.close() } + test("fetch remote block to local disk if block size is larger than threshold") { + conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1000L) + + val mockBlockManagerMaster = mock(classOf[BlockManagerMaster]) + val mockBlockTransferService = new MockBlockTransferService(0) + val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1)) + val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L) + + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( + Option(BlockLocationsAndStatus(blockLocations, blockStatus))) + when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations) + + store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, + transferService = Option(mockBlockTransferService)) + val block = store.getRemoteBytes("item") + .asInstanceOf[Option[ByteBuffer]] + + assert(block.isDefined) + assert(mockBlockTransferService.numCalls === 1) + // assert FileManager is not null if the block size is larger than threshold. + assert(mockBlockTransferService.tempFileManager === store.remoteBlockTempFileManager) + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 + var tempFileManager: TempFileManager = null override def init(blockDataManager: BlockDataManager): Unit = {} @@ -1382,7 +1410,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempShuffleFileManager: TempShuffleFileManager): Unit = { + tempFileManager: TempFileManager): Unit = { listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) } @@ -1394,7 +1422,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def uploadBlock( hostname: String, - port: Int, execId: String, + port: Int, + execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, @@ -1407,12 +1436,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE host: String, port: Int, execId: String, - blockId: String): ManagedBuffer = { + blockId: String, + tempFileManager: TempFileManager): ManagedBuffer = { numCalls += 1 + this.tempFileManager = tempFileManager if (numCalls <= maxFailures) { throw new RuntimeException("Failing block fetch in the mock block transfer service") } - super.fetchBlockSync(host, port, execId, blockId) + super.fetchBlockSync(host, port, execId, blockId, tempFileManager) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index c371cbc..5bfe990 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.{SparkFunSuite, TaskContext} import org.apache.spark.network._ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, TempShuffleFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager} import org.apache.spark.network.util.LimitedInputStream import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.Utils @@ -437,12 +437,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT val remoteBlocks = Map[BlockId, ManagedBuffer]( ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer()) val transfer = mock(classOf[BlockTransferService]) - var tempShuffleFileManager: TempShuffleFileManager = null + var tempFileManager: TempFileManager = null when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) .thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - tempShuffleFileManager = invocation.getArguments()(5).asInstanceOf[TempShuffleFileManager] + tempFileManager = invocation.getArguments()(5).asInstanceOf[TempFileManager] Future { listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0))) @@ -472,13 +472,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT fetchShuffleBlock(blocksByAddress1) // `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch // shuffle block to disk. - assert(tempShuffleFileManager == null) + assert(tempFileManager == null) val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq)) fetchShuffleBlock(blocksByAddress2) // `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch // shuffle block to disk. - assert(tempShuffleFileManager != null) + assert(tempFileManager != null) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 7a777d3..bb06c8f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -547,13 +547,14 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.reducer.maxReqSizeShuffleToMem</code></td> + <td><code>spark.maxRemoteBlockSizeFetchToMem</code></td> <td>Long.MaxValue</td> <td> - The blocks of a shuffle request will be fetched to disk when size of the request is above - this threshold. This is to avoid a giant request takes too much memory. We can enable this - config by setting a specific value(e.g. 200m). Note that this config can be enabled only when - the shuffle shuffle service is newer than Spark-2.2 or the shuffle service is disabled. + The remote block will be fetched to disk when size of the block is above this threshold. + This is to avoid a giant request takes too much memory. We can enable this config by setting + a specific value(e.g. 200m). Note this configuration will affect both shuffle fetch + and block manager remote block fetch. For users who enabled external shuffle service, + this feature can only be worked when external shuffle service is newer than Spark 2.2. </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org