[SPARK-6627] Finished rename to ShuffleBlockResolver The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that.
I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class? cc pwendell Author: Kay Ousterhout <kayousterh...@gmail.com> Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits: 43add1e [Kay Ousterhout] Spacing fix 96080bf [Kay Ousterhout] Test fixes d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b3bb0e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b3bb0e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b3bb0e4 Branch: refs/heads/master Commit: 4b3bb0e43ca7e1a27308516608419487b6a844e6 Parents: 2d05f32 Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Fri May 8 12:24:06 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Fri May 8 12:24:06 2015 -0700 ---------------------------------------------------------------------- .../spark/shuffle/FileShuffleBlockManager.scala | 300 ------------------- .../shuffle/FileShuffleBlockResolver.scala | 297 ++++++++++++++++++ .../shuffle/IndexShuffleBlockManager.scala | 124 -------- .../shuffle/IndexShuffleBlockResolver.scala | 122 ++++++++ .../spark/shuffle/hash/HashShuffleManager.scala | 6 +- .../spark/shuffle/hash/HashShuffleWriter.scala | 4 +- .../spark/shuffle/sort/SortShuffleManager.scala | 6 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 12 +- .../org/apache/spark/storage/BlockId.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 5 +- .../apache/spark/storage/DiskBlockManager.scala | 2 +- .../shuffle/hash/HashShuffleManagerSuite.scala | 18 +- .../shuffle/ExternalShuffleBlockHandler.java | 6 +- .../shuffle/ExternalShuffleBlockManager.java | 254 ---------------- .../shuffle/ExternalShuffleBlockResolver.java | 254 ++++++++++++++++ .../ExternalShuffleBlockHandlerSuite.java | 16 +- .../ExternalShuffleBlockManagerSuite.java | 129 -------- .../ExternalShuffleBlockResolverSuite.java | 129 ++++++++ .../shuffle/ExternalShuffleCleanupSuite.java | 37 +-- .../network/shuffle/TestShuffleDataContext.java | 8 +- 20 files changed, 865 insertions(+), 866 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala deleted file mode 100644 index e9b4e2b..0000000 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.File -import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.JavaConversions._ - -import org.apache.spark.{Logging, SparkConf, SparkEnv} -import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup -import org.apache.spark.storage._ -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} -import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} - -/** A group of writers for a ShuffleMapTask, one writer per reducer. */ -private[spark] trait ShuffleWriterGroup { - val writers: Array[BlockObjectWriter] - - /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ - def releaseWriters(success: Boolean) -} - -/** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file - * per reducer (this set of files is called a ShuffleFileGroup). - * - * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer - * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle - * files, it releases them for another task. - * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: - * - shuffleId: The unique id given to the entire shuffle stage. - * - bucketId: The id of the output partition (i.e., reducer id) - * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a - * time owns a particular fileId, and this id is returned to a pool when the task finishes. - * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) - * that specifies where in a given file the actual block data is located. - * - * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping - * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for - * each block stored in each file. In order to find the location of a shuffle block, we search the - * files within a ShuffleFileGroups associated with the block's reducer. - */ -// Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData(). -private[spark] -class FileShuffleBlockManager(conf: SparkConf) - extends ShuffleBlockResolver with Logging { - - private val transportConf = SparkTransportConf.fromSparkConf(conf) - - private lazy val blockManager = SparkEnv.get.blockManager - - // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. - // TODO: Remove this once the shuffle file consolidation feature is stable. - private val consolidateShuffleFiles = - conf.getBoolean("spark.shuffle.consolidateFiles", false) - - // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided - private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 - - /** - * Contains all the state related to a particular shuffle. This includes a pool of unused - * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. - */ - private class ShuffleState(val numBuckets: Int) { - val nextFileId = new AtomicInteger(0) - val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() - val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() - - /** - * The mapIds of all map tasks completed on this Executor for this shuffle. - * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise. - */ - val completedMapTasks = new ConcurrentLinkedQueue[Int]() - } - - private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] - - private val metadataCleaner = - new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) - - /** - * Get a ShuffleWriterGroup for the given map task, which will register it as complete - * when the writers are closed successfully - */ - def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, - writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { - new ShuffleWriterGroup { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) - private val shuffleState = shuffleStates(shuffleId) - private var fileGroup: ShuffleFileGroup = null - - val openStartTime = System.nanoTime - val serializerInstance = serializer.newInstance() - val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { - fileGroup = getUnusedFileGroup() - Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, - writeMetrics) - } - } else { - Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val blockFile = blockManager.diskBlockManager.getFile(blockId) - // Because of previous failures, the shuffle file may already exist on this machine. - // If so, remove it. - if (blockFile.exists) { - if (blockFile.delete()) { - logInfo(s"Removed existing shuffle file $blockFile") - } else { - logWarning(s"Failed to remove existing shuffle file $blockFile") - } - } - blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize, - writeMetrics) - } - } - // Creating the file to write to and creating a disk writer both involve interacting with - // the disk, so should be included in the shuffle write time. - writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) - - override def releaseWriters(success: Boolean) { - if (consolidateShuffleFiles) { - if (success) { - val offsets = writers.map(_.fileSegment().offset) - val lengths = writers.map(_.fileSegment().length) - fileGroup.recordMapOutput(mapId, offsets, lengths) - } - recycleFileGroup(fileGroup) - } else { - shuffleState.completedMapTasks.add(mapId) - } - } - - private def getUnusedFileGroup(): ShuffleFileGroup = { - val fileGroup = shuffleState.unusedFileGroups.poll() - if (fileGroup != null) fileGroup else newFileGroup() - } - - private def newFileGroup(): ShuffleFileGroup = { - val fileId = shuffleState.nextFileId.getAndIncrement() - val files = Array.tabulate[File](numBuckets) { bucketId => - val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.diskBlockManager.getFile(filename) - } - val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) - shuffleState.allFileGroups.add(fileGroup) - fileGroup - } - - private def recycleFileGroup(group: ShuffleFileGroup) { - shuffleState.unusedFileGroups.add(group) - } - } - } - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { - if (consolidateShuffleFiles) { - // Search all file groups associated with this shuffle. - val shuffleState = shuffleStates(blockId.shuffleId) - val iter = shuffleState.allFileGroups.iterator - while (iter.hasNext) { - val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId) - if (segmentOpt.isDefined) { - val segment = segmentOpt.get - return new FileSegmentManagedBuffer( - transportConf, segment.file, segment.offset, segment.length) - } - } - throw new IllegalStateException("Failed to find shuffle block: " + blockId) - } else { - val file = blockManager.diskBlockManager.getFile(blockId) - new FileSegmentManagedBuffer(transportConf, file, 0, file.length) - } - } - - /** Remove all the blocks / files and metadata related to a particular shuffle. */ - def removeShuffle(shuffleId: ShuffleId): Boolean = { - // Do not change the ordering of this, if shuffleStates should be removed only - // after the corresponding shuffle blocks have been removed - val cleaned = removeShuffleBlocks(shuffleId) - shuffleStates.remove(shuffleId) - cleaned - } - - /** Remove all the blocks / files related to a particular shuffle. */ - private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { - shuffleStates.get(shuffleId) match { - case Some(state) => - if (consolidateShuffleFiles) { - for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { - file.delete() - } - } else { - for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { - val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) - blockManager.diskBlockManager.getFile(blockId).delete() - } - } - logInfo("Deleted all files for shuffle " + shuffleId) - true - case None => - logInfo("Could not find files for shuffle " + shuffleId + " for deleting") - false - } - } - - private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { - "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) - } - - private def cleanup(cleanupTime: Long) { - shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) - } - - override def stop() { - metadataCleaner.cancel() - } -} - -private[spark] -object FileShuffleBlockManager { - /** - * A group of shuffle files, one per reducer. - * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. - */ - private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { - private var numBlocks: Int = 0 - - /** - * Stores the absolute index of each mapId in the files of this group. For instance, - * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. - */ - private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() - - /** - * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by - * position in the file. - * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every - * reducer. - */ - private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { - new PrimitiveVector[Long]() - } - private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { - new PrimitiveVector[Long]() - } - - def apply(bucketId: Int): File = files(bucketId) - - def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) { - assert(offsets.length == lengths.length) - mapIdToIndex(mapId) = numBlocks - numBlocks += 1 - for (i <- 0 until offsets.length) { - blockOffsetsByReducer(i) += offsets(i) - blockLengthsByReducer(i) += lengths(i) - } - } - - /** Returns the FileSegment associated with the given map task, or None if no entry exists. */ - def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { - val file = files(reducerId) - val blockOffsets = blockOffsetsByReducer(reducerId) - val blockLengths = blockLengthsByReducer(reducerId) - val index = mapIdToIndex.getOrElse(mapId, -1) - if (index >= 0) { - val offset = blockOffsets(index) - val length = blockLengths(index) - Some(new FileSegment(file, offset, length)) - } else { - None - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala new file mode 100644 index 0000000..6ad427b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io.File +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConversions._ + +import org.apache.spark.{Logging, SparkConf, SparkEnv} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup +import org.apache.spark.storage._ +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} + +/** A group of writers for a ShuffleMapTask, one writer per reducer. */ +private[spark] trait ShuffleWriterGroup { + val writers: Array[BlockObjectWriter] + + /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ + def releaseWriters(success: Boolean) +} + +/** + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file + * per reducer (this set of files is called a ShuffleFileGroup). + * + * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle + * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle + * files, it releases them for another task. + * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: + * - shuffleId: The unique id given to the entire shuffle stage. + * - bucketId: The id of the output partition (i.e., reducer id) + * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a + * time owns a particular fileId, and this id is returned to a pool when the task finishes. + * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) + * that specifies where in a given file the actual block data is located. + * + * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping + * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for + * each block stored in each file. In order to find the location of a shuffle block, we search the + * files within a ShuffleFileGroups associated with the block's reducer. + */ +// Note: Changes to the format in this file should be kept in sync with +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData(). +private[spark] class FileShuffleBlockResolver(conf: SparkConf) + extends ShuffleBlockResolver with Logging { + + private val transportConf = SparkTransportConf.fromSparkConf(conf) + + private lazy val blockManager = SparkEnv.get.blockManager + + // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. + // TODO: Remove this once the shuffle file consolidation feature is stable. + private val consolidateShuffleFiles = + conf.getBoolean("spark.shuffle.consolidateFiles", false) + + // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided + private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + + /** + * Contains all the state related to a particular shuffle. This includes a pool of unused + * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. + */ + private class ShuffleState(val numBuckets: Int) { + val nextFileId = new AtomicInteger(0) + val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + + /** + * The mapIds of all map tasks completed on this Executor for this shuffle. + * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise. + */ + val completedMapTasks = new ConcurrentLinkedQueue[Int]() + } + + private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] + + private val metadataCleaner = + new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) + + /** + * Get a ShuffleWriterGroup for the given map task, which will register it as complete + * when the writers are closed successfully + */ + def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, + writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { + new ShuffleWriterGroup { + shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) + private val shuffleState = shuffleStates(shuffleId) + private var fileGroup: ShuffleFileGroup = null + + val openStartTime = System.nanoTime + val serializerInstance = serializer.newInstance() + val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { + fileGroup = getUnusedFileGroup() + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, + writeMetrics) + } + } else { + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + val blockFile = blockManager.diskBlockManager.getFile(blockId) + // Because of previous failures, the shuffle file may already exist on this machine. + // If so, remove it. + if (blockFile.exists) { + if (blockFile.delete()) { + logInfo(s"Removed existing shuffle file $blockFile") + } else { + logWarning(s"Failed to remove existing shuffle file $blockFile") + } + } + blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize, + writeMetrics) + } + } + // Creating the file to write to and creating a disk writer both involve interacting with + // the disk, so should be included in the shuffle write time. + writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) + + override def releaseWriters(success: Boolean) { + if (consolidateShuffleFiles) { + if (success) { + val offsets = writers.map(_.fileSegment().offset) + val lengths = writers.map(_.fileSegment().length) + fileGroup.recordMapOutput(mapId, offsets, lengths) + } + recycleFileGroup(fileGroup) + } else { + shuffleState.completedMapTasks.add(mapId) + } + } + + private def getUnusedFileGroup(): ShuffleFileGroup = { + val fileGroup = shuffleState.unusedFileGroups.poll() + if (fileGroup != null) fileGroup else newFileGroup() + } + + private def newFileGroup(): ShuffleFileGroup = { + val fileId = shuffleState.nextFileId.getAndIncrement() + val files = Array.tabulate[File](numBuckets) { bucketId => + val filename = physicalFileName(shuffleId, bucketId, fileId) + blockManager.diskBlockManager.getFile(filename) + } + val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) + shuffleState.allFileGroups.add(fileGroup) + fileGroup + } + + private def recycleFileGroup(group: ShuffleFileGroup) { + shuffleState.unusedFileGroups.add(group) + } + } + } + + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + if (consolidateShuffleFiles) { + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(blockId.shuffleId) + val iter = shuffleState.allFileGroups.iterator + while (iter.hasNext) { + val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId) + if (segmentOpt.isDefined) { + val segment = segmentOpt.get + return new FileSegmentManagedBuffer( + transportConf, segment.file, segment.offset, segment.length) + } + } + throw new IllegalStateException("Failed to find shuffle block: " + blockId) + } else { + val file = blockManager.diskBlockManager.getFile(blockId) + new FileSegmentManagedBuffer(transportConf, file, 0, file.length) + } + } + + /** Remove all the blocks / files and metadata related to a particular shuffle. */ + def removeShuffle(shuffleId: ShuffleId): Boolean = { + // Do not change the ordering of this, if shuffleStates should be removed only + // after the corresponding shuffle blocks have been removed + val cleaned = removeShuffleBlocks(shuffleId) + shuffleStates.remove(shuffleId) + cleaned + } + + /** Remove all the blocks / files related to a particular shuffle. */ + private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { + shuffleStates.get(shuffleId) match { + case Some(state) => + if (consolidateShuffleFiles) { + for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { + file.delete() + } + } else { + for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { + val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) + blockManager.diskBlockManager.getFile(blockId).delete() + } + } + logInfo("Deleted all files for shuffle " + shuffleId) + true + case None => + logInfo("Could not find files for shuffle " + shuffleId + " for deleting") + false + } + } + + private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) + } + + private def cleanup(cleanupTime: Long) { + shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) + } + + override def stop() { + metadataCleaner.cancel() + } +} + +private[spark] object FileShuffleBlockResolver { + /** + * A group of shuffle files, one per reducer. + * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. + */ + private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + private var numBlocks: Int = 0 + + /** + * Stores the absolute index of each mapId in the files of this group. For instance, + * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. + */ + private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() + + /** + * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by + * position in the file. + * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every + * reducer. + */ + private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() + } + private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() + } + + def apply(bucketId: Int): File = files(bucketId) + + def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) { + assert(offsets.length == lengths.length) + mapIdToIndex(mapId) = numBlocks + numBlocks += 1 + for (i <- 0 until offsets.length) { + blockOffsetsByReducer(i) += offsets(i) + blockLengthsByReducer(i) += lengths(i) + } + } + + /** Returns the FileSegment associated with the given map task, or None if no entry exists. */ + def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { + val file = files(reducerId) + val blockOffsets = blockOffsetsByReducer(reducerId) + val blockLengths = blockLengthsByReducer(reducerId) + val index = mapIdToIndex.getOrElse(mapId, -1) + if (index >= 0) { + val offset = blockOffsets(index) + val length = blockLengths(index) + Some(new FileSegment(file, offset, length)) + } else { + None + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala deleted file mode 100644 index a1741e2..0000000 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io._ -import java.nio.ByteBuffer - -import com.google.common.io.ByteStreams - -import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.storage._ -import org.apache.spark.util.Utils - -import IndexShuffleBlockManager.NOOP_REDUCE_ID - -/** - * Create and maintain the shuffle blocks' mapping between logic block and physical file location. - * Data of shuffle blocks from the same map task are stored in a single consolidated data file. - * The offsets of the data blocks in the data file are stored in a separate index file. - * - * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" - * as the filename postfix for data file, and ".index" as the filename postfix for index file. - * - */ -// Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData(). -private[spark] -class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { - - private lazy val blockManager = SparkEnv.get.blockManager - - private val transportConf = SparkTransportConf.fromSparkConf(conf) - - def getDataFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) - } - - private def getIndexFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) - } - - /** - * Remove data file and index file that contain the output data from one map. - * */ - def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { - var file = getDataFile(shuffleId, mapId) - if (file.exists()) { - file.delete() - } - - file = getIndexFile(shuffleId, mapId) - if (file.exists()) { - file.delete() - } - } - - /** - * Write an index file with the offsets of each block, plus a final offset at the end for the - * end of the output file. This will be used by getBlockLocation to figure out where each block - * begins and ends. - * */ - def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { - val indexFile = getIndexFile(shuffleId, mapId) - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) - Utils.tryWithSafeFinally { - // We take in lengths of each block, need to convert it to offsets. - var offset = 0L - out.writeLong(offset) - for (length <- lengths) { - offset += length - out.writeLong(offset) - } - } { - out.close() - } - } - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { - // The block is actually going to be a range of a single map output file for this map, so - // find out the consolidated file, then the offset within that from our index - val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - - val in = new DataInputStream(new FileInputStream(indexFile)) - try { - ByteStreams.skipFully(in, blockId.reduceId * 8) - val offset = in.readLong() - val nextOffset = in.readLong() - new FileSegmentManagedBuffer( - transportConf, - getDataFile(blockId.shuffleId, blockId.mapId), - offset, - nextOffset - offset) - } finally { - in.close() - } - } - - override def stop(): Unit = {} -} - -private[spark] object IndexShuffleBlockManager { - // No-op reduce ID used in interactions with disk store and BlockObjectWriter. - // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort - // shuffle outputs for several reduces are glommed into a single file. - // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId. - val NOOP_REDUCE_ID = 0 -} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala new file mode 100644 index 0000000..d9c63b6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ + +import com.google.common.io.ByteStreams + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.storage._ +import org.apache.spark.util.Utils + +import IndexShuffleBlockResolver.NOOP_REDUCE_ID + +/** + * Create and maintain the shuffle blocks' mapping between logic block and physical file location. + * Data of shuffle blocks from the same map task are stored in a single consolidated data file. + * The offsets of the data blocks in the data file are stored in a separate index file. + * + * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" + * as the filename postfix for data file, and ".index" as the filename postfix for index file. + * + */ +// Note: Changes to the format in this file should be kept in sync with +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + private val transportConf = SparkTransportConf.fromSparkConf(conf) + + def getDataFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + } + + private def getIndexFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + } + + /** + * Remove data file and index file that contain the output data from one map. + * */ + def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { + var file = getDataFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + + file = getIndexFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + } + + /** + * Write an index file with the offsets of each block, plus a final offset at the end for the + * end of the output file. This will be used by getBlockLocation to figure out where each block + * begins and ends. + * */ + def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { + val indexFile = getIndexFile(shuffleId, mapId) + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) + Utils.tryWithSafeFinally { + // We take in lengths of each block, need to convert it to offsets. + var offset = 0L + out.writeLong(offset) + for (length <- lengths) { + offset += length + out.writeLong(offset) + } + } { + out.close() + } + } + + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + // The block is actually going to be a range of a single map output file for this map, so + // find out the consolidated file, then the offset within that from our index + val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) + + val in = new DataInputStream(new FileInputStream(indexFile)) + try { + ByteStreams.skipFully(in, blockId.reduceId * 8) + val offset = in.readLong() + val nextOffset = in.readLong() + new FileSegmentManagedBuffer( + transportConf, + getDataFile(blockId.shuffleId, blockId.mapId), + offset, + nextOffset - offset) + } finally { + in.close() + } + } + + override def stop(): Unit = {} +} + +private[spark] object IndexShuffleBlockResolver { + // No-op reduce ID used in interactions with disk store and BlockObjectWriter. + // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort + // shuffle outputs for several reduces are glommed into a single file. + // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId. + val NOOP_REDUCE_ID = 0 +} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index 2a7df8d..c089088 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.shuffle._ */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { - private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( @@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager shuffleBlockResolver.removeShuffle(shuffleId) } - override def shuffleBlockResolver: FileShuffleBlockManager = { - fileShuffleBlockManager + override def shuffleBlockResolver: FileShuffleBlockResolver = { + fileShuffleBlockResolver } /** Shut down this ShuffleManager. */ http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index cd27c9e..897f0a5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle._ import org.apache.spark.storage.BlockObjectWriter private[spark] class HashShuffleWriter[K, V]( - shuffleBlockManager: FileShuffleBlockManager, + shuffleBlockResolver: FileShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) @@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V]( private val blockManager = SparkEnv.get.blockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) - private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, + private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) /** Write a bunch of records to this task's output */ http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 0497036..1584294 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { - private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf) + private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf) private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() /** @@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager true } - override def shuffleBlockResolver: IndexShuffleBlockManager = { - indexShuffleBlockManager + override def shuffleBlockResolver: IndexShuffleBlockResolver = { + indexShuffleBlockResolver } /** Shut down this ShuffleManager. */ http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index a066435..add2656 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter private[spark] class SortShuffleWriter[K, V, C]( - shuffleBlockManager: IndexShuffleBlockManager, + shuffleBlockResolver: IndexShuffleBlockResolver, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). - val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID) + val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) + val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) - shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } @@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C]( return Option(mapStatus) } else { // The map task failed, so delete our output data. - shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId) + shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) return None } } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c186fd3..524f697 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { } // Format of the shuffle block ids (including data and index) should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData(). +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a46fecd..cc794e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -431,10 +431,11 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - val shuffleBlockManager = shuffleManager.shuffleBlockResolver + val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. - Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) + Option( + shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) } else { doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 5764c16..2a44477 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile(). + // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala index 84384bb..0537bf6 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.shuffle.FileShuffleBlockResolver import org.apache.spark.storage.{ShuffleBlockId, FileSegment} class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { @@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) - val shuffleBlockManager = - SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager] + val shuffleBlockResolver = + SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver] - val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle1.writers) { writer.write("test1", "value") @@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { val shuffle1Segment = shuffle1.writers(0).fileSegment() shuffle1.releaseWriters(success = true) - val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle2.writers) { @@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { // of block based on remaining data in file : which could mess things up when there is // concurrent read and writes happening to the same shuffle group. - val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf), new ShuffleWriteMetrics) for (writer <- shuffle3.writers) { writer.write("test3", "value") @@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { writer.commitAndClose() } // check before we register. - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) - shuffleBlockManager.removeShuffle(1) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) + shuffleBlockResolver.removeShuffle(1) } def writeToFile(file: File, numBytes: Int) { http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 46ca970..e4faaf8 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -46,18 +46,18 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle; public class ExternalShuffleBlockHandler extends RpcHandler { private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class); - private final ExternalShuffleBlockManager blockManager; + private final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager; public ExternalShuffleBlockHandler(TransportConf conf) { - this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf)); + this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf)); } /** Enables mocking out the StreamManager and BlockManager. */ @VisibleForTesting ExternalShuffleBlockHandler( OneForOneStreamManager streamManager, - ExternalShuffleBlockManager blockManager) { + ExternalShuffleBlockResolver blockManager) { this.streamManager = streamManager; this.blockManager = blockManager; } http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java deleted file mode 100644 index 93e6fdd..0000000 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffle; - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.buffer.FileSegmentManagedBuffer; -import org.apache.spark.network.buffer.ManagedBuffer; -import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.NettyUtils; -import org.apache.spark.network.util.TransportConf; - -/** - * Manages converting shuffle BlockIds into physical segments of local files, from a process outside - * of Executors. Each Executor must register its own configuration about where it stores its files - * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated - * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager. - * - * Executors with shuffle file consolidation are not currently supported, as the index is stored in - * the Executor's memory, unlike the IndexShuffleBlockManager. - */ -public class ExternalShuffleBlockManager { - private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class); - - // Map containing all registered executors' metadata. - private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors; - - // Single-threaded Java executor used to perform expensive recursive directory deletion. - private final Executor directoryCleaner; - - private final TransportConf conf; - - public ExternalShuffleBlockManager(TransportConf conf) { - this(conf, Executors.newSingleThreadExecutor( - // Add `spark` prefix because it will run in NM in Yarn mode. - NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); - } - - // Allows tests to have more control over when directories are cleaned up. - @VisibleForTesting - ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) { - this.conf = conf; - this.executors = Maps.newConcurrentMap(); - this.directoryCleaner = directoryCleaner; - } - - /** Registers a new Executor with all the configuration we need to find its shuffle files. */ - public void registerExecutor( - String appId, - String execId, - ExecutorShuffleInfo executorInfo) { - AppExecId fullId = new AppExecId(appId, execId); - logger.info("Registered executor {} with {}", fullId, executorInfo); - executors.put(fullId, executorInfo); - } - - /** - * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the - * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make - * assumptions about how the hash and sort based shuffles store their data. - */ - public ManagedBuffer getBlockData(String appId, String execId, String blockId) { - String[] blockIdParts = blockId.split("_"); - if (blockIdParts.length < 4) { - throw new IllegalArgumentException("Unexpected block id format: " + blockId); - } else if (!blockIdParts[0].equals("shuffle")) { - throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); - } - int shuffleId = Integer.parseInt(blockIdParts[1]); - int mapId = Integer.parseInt(blockIdParts[2]); - int reduceId = Integer.parseInt(blockIdParts[3]); - - ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); - if (executor == null) { - throw new RuntimeException( - String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); - } - - if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) { - return getHashBasedShuffleBlockData(executor, blockId); - } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) { - return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); - } else { - throw new UnsupportedOperationException( - "Unsupported shuffle manager: " + executor.shuffleManager); - } - } - - /** - * Removes our metadata of all executors registered for the given application, and optionally - * also deletes the local directories associated with the executors of that application in a - * separate thread. - * - * It is not valid to call registerExecutor() for an executor with this appId after invoking - * this method. - */ - public void applicationRemoved(String appId, boolean cleanupLocalDirs) { - logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next(); - AppExecId fullId = entry.getKey(); - final ExecutorShuffleInfo executor = entry.getValue(); - - // Only touch executors associated with the appId that was removed. - if (appId.equals(fullId.appId)) { - it.remove(); - - if (cleanupLocalDirs) { - logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length); - - // Execute the actual deletion in a different thread, as it may take some time. - directoryCleaner.execute(new Runnable() { - @Override - public void run() { - deleteExecutorDirs(executor.localDirs); - } - }); - } - } - } - } - - /** - * Synchronously deletes each directory one at a time. - * Should be executed in its own thread, as this may take a long time. - */ - private void deleteExecutorDirs(String[] dirs) { - for (String localDir : dirs) { - try { - JavaUtils.deleteRecursively(new File(localDir)); - logger.debug("Successfully cleaned up directory: " + localDir); - } catch (Exception e) { - logger.error("Failed to delete directory: " + localDir, e); - } - } - } - - /** - * Hash-based shuffle data is simply stored as one file per block. - * This logic is from FileShuffleBlockManager. - */ - // TODO: Support consolidated hash shuffle files - private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { - File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); - } - - /** - * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file - * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager, - * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. - */ - private ManagedBuffer getSortBasedShuffleBlockData( - ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { - File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, - "shuffle_" + shuffleId + "_" + mapId + "_0.index"); - - DataInputStream in = null; - try { - in = new DataInputStream(new FileInputStream(indexFile)); - in.skipBytes(reduceId * 8); - long offset = in.readLong(); - long nextOffset = in.readLong(); - return new FileSegmentManagedBuffer( - conf, - getFile(executor.localDirs, executor.subDirsPerLocalDir, - "shuffle_" + shuffleId + "_" + mapId + "_0.data"), - offset, - nextOffset - offset); - } catch (IOException e) { - throw new RuntimeException("Failed to open file: " + indexFile, e); - } finally { - if (in != null) { - JavaUtils.closeQuietly(in); - } - } - } - - /** - * Hashes a filename into the corresponding local directory, in a manner consistent with - * Spark's DiskBlockManager.getFile(). - */ - @VisibleForTesting - static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { - int hash = JavaUtils.nonNegativeHash(filename); - String localDir = localDirs[hash % localDirs.length]; - int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; - return new File(new File(localDir, String.format("%02x", subDirId)), filename); - } - - /** Simply encodes an executor's full ID, which is appId + execId. */ - private static class AppExecId { - final String appId; - final String execId; - - private AppExecId(String appId, String execId) { - this.appId = appId; - this.execId = execId; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - AppExecId appExecId = (AppExecId) o; - return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId); - } - - @Override - public int hashCode() { - return Objects.hashCode(appId, execId); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java new file mode 100644 index 0000000..dd08e24 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.NettyUtils; +import org.apache.spark.network.util.TransportConf; + +/** + * Manages converting shuffle BlockIds into physical segments of local files, from a process outside + * of Executors. Each Executor must register its own configuration about where it stores its files + * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated + * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver. + * + * Executors with shuffle file consolidation are not currently supported, as the index is stored in + * the Executor's memory, unlike the IndexShuffleBlockResolver. + */ +public class ExternalShuffleBlockResolver { + private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class); + + // Map containing all registered executors' metadata. + private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors; + + // Single-threaded Java executor used to perform expensive recursive directory deletion. + private final Executor directoryCleaner; + + private final TransportConf conf; + + public ExternalShuffleBlockResolver(TransportConf conf) { + this(conf, Executors.newSingleThreadExecutor( + // Add `spark` prefix because it will run in NM in Yarn mode. + NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); + } + + // Allows tests to have more control over when directories are cleaned up. + @VisibleForTesting + ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) { + this.conf = conf; + this.executors = Maps.newConcurrentMap(); + this.directoryCleaner = directoryCleaner; + } + + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ + public void registerExecutor( + String appId, + String execId, + ExecutorShuffleInfo executorInfo) { + AppExecId fullId = new AppExecId(appId, execId); + logger.info("Registered executor {} with {}", fullId, executorInfo); + executors.put(fullId, executorInfo); + } + + /** + * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the + * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make + * assumptions about how the hash and sort based shuffles store their data. + */ + public ManagedBuffer getBlockData(String appId, String execId, String blockId) { + String[] blockIdParts = blockId.split("_"); + if (blockIdParts.length < 4) { + throw new IllegalArgumentException("Unexpected block id format: " + blockId); + } else if (!blockIdParts[0].equals("shuffle")) { + throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); + } + int shuffleId = Integer.parseInt(blockIdParts[1]); + int mapId = Integer.parseInt(blockIdParts[2]); + int reduceId = Integer.parseInt(blockIdParts[3]); + + ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); + if (executor == null) { + throw new RuntimeException( + String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); + } + + if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) { + return getHashBasedShuffleBlockData(executor, blockId); + } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) { + return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); + } else { + throw new UnsupportedOperationException( + "Unsupported shuffle manager: " + executor.shuffleManager); + } + } + + /** + * Removes our metadata of all executors registered for the given application, and optionally + * also deletes the local directories associated with the executors of that application in a + * separate thread. + * + * It is not valid to call registerExecutor() for an executor with this appId after invoking + * this method. + */ + public void applicationRemoved(String appId, boolean cleanupLocalDirs) { + logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); + Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next(); + AppExecId fullId = entry.getKey(); + final ExecutorShuffleInfo executor = entry.getValue(); + + // Only touch executors associated with the appId that was removed. + if (appId.equals(fullId.appId)) { + it.remove(); + + if (cleanupLocalDirs) { + logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length); + + // Execute the actual deletion in a different thread, as it may take some time. + directoryCleaner.execute(new Runnable() { + @Override + public void run() { + deleteExecutorDirs(executor.localDirs); + } + }); + } + } + } + } + + /** + * Synchronously deletes each directory one at a time. + * Should be executed in its own thread, as this may take a long time. + */ + private void deleteExecutorDirs(String[] dirs) { + for (String localDir : dirs) { + try { + JavaUtils.deleteRecursively(new File(localDir)); + logger.debug("Successfully cleaned up directory: " + localDir); + } catch (Exception e) { + logger.error("Failed to delete directory: " + localDir, e); + } + } + } + + /** + * Hash-based shuffle data is simply stored as one file per block. + * This logic is from FileShuffleBlockResolver. + */ + // TODO: Support consolidated hash shuffle files + private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { + File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); + } + + /** + * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file + * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, + * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. + */ + private ManagedBuffer getSortBasedShuffleBlockData( + ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { + File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, + "shuffle_" + shuffleId + "_" + mapId + "_0.index"); + + DataInputStream in = null; + try { + in = new DataInputStream(new FileInputStream(indexFile)); + in.skipBytes(reduceId * 8); + long offset = in.readLong(); + long nextOffset = in.readLong(); + return new FileSegmentManagedBuffer( + conf, + getFile(executor.localDirs, executor.subDirsPerLocalDir, + "shuffle_" + shuffleId + "_" + mapId + "_0.data"), + offset, + nextOffset - offset); + } catch (IOException e) { + throw new RuntimeException("Failed to open file: " + indexFile, e); + } finally { + if (in != null) { + JavaUtils.closeQuietly(in); + } + } + } + + /** + * Hashes a filename into the corresponding local directory, in a manner consistent with + * Spark's DiskBlockManager.getFile(). + */ + @VisibleForTesting + static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { + int hash = JavaUtils.nonNegativeHash(filename); + String localDir = localDirs[hash % localDirs.length]; + int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; + return new File(new File(localDir, String.format("%02x", subDirId)), filename); + } + + /** Simply encodes an executor's full ID, which is appId + execId. */ + private static class AppExecId { + final String appId; + final String execId; + + private AppExecId(String appId, String execId) { + this.appId = appId; + this.execId = execId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AppExecId appExecId = (AppExecId) o; + return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId); + } + + @Override + public int hashCode() { + return Objects.hashCode(appId, execId); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appId", appId) + .add("execId", execId) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index 3f9fe16..73374cd 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite { TransportClient client = mock(TransportClient.class); OneForOneStreamManager streamManager; - ExternalShuffleBlockManager blockManager; + ExternalShuffleBlockResolver blockResolver; RpcHandler handler; @Before public void beforeEach() { streamManager = mock(OneForOneStreamManager.class); - blockManager = mock(ExternalShuffleBlockManager.class); - handler = new ExternalShuffleBlockHandler(streamManager, blockManager); + blockResolver = mock(ExternalShuffleBlockResolver.class); + handler = new ExternalShuffleBlockHandler(streamManager, blockResolver); } @Test @@ -62,7 +62,7 @@ public class ExternalShuffleBlockHandlerSuite { ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort"); byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray(); handler.receive(client, registerMessage, callback); - verify(blockManager, times(1)).registerExecutor("app0", "exec1", config); + verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config); verify(callback, times(1)).onSuccess((byte[]) any()); verify(callback, never()).onFailure((Throwable) any()); @@ -75,12 +75,12 @@ public class ExternalShuffleBlockHandlerSuite { ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3])); ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); - when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); + when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); + when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray(); handler.receive(client, openBlocks, callback); - verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0"); - verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1"); ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class); verify(callback, times(1)).onSuccess(response.capture()); http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java deleted file mode 100644 index dad6428..0000000 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffle; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -import com.google.common.io.CharStreams; -import org.apache.spark.network.util.SystemPropertyConfigProvider; -import org.apache.spark.network.util.TransportConf; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class ExternalShuffleBlockManagerSuite { - static String sortBlock0 = "Hello!"; - static String sortBlock1 = "World!"; - - static String hashBlock0 = "Elementary"; - static String hashBlock1 = "Tabular"; - - static TestShuffleDataContext dataContext; - - static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); - - @BeforeClass - public static void beforeAll() throws IOException { - dataContext = new TestShuffleDataContext(2, 5); - - dataContext.create(); - // Write some sort and hash data. - dataContext.insertSortShuffleData(0, 0, - new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } ); - dataContext.insertHashShuffleData(1, 0, - new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } ); - } - - @AfterClass - public static void afterAll() { - dataContext.cleanup(); - } - - @Test - public void testBadRequests() { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); - // Unregistered executor - try { - manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); - fail("Should have failed"); - } catch (RuntimeException e) { - assertTrue("Bad error message: " + e, e.getMessage().contains("not registered")); - } - - // Invalid shuffle manager - manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); - try { - manager.getBlockData("app0", "exec2", "shuffle_1_1_0"); - fail("Should have failed"); - } catch (UnsupportedOperationException e) { - // pass - } - - // Nonexistent shuffle block - manager.registerExecutor("app0", "exec3", - dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); - try { - manager.getBlockData("app0", "exec3", "shuffle_1_1_0"); - fail("Should have failed"); - } catch (Exception e) { - // pass - } - } - - @Test - public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); - manager.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); - - InputStream block0Stream = - manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); - block0Stream.close(); - assertEquals(sortBlock0, block0); - - InputStream block1Stream = - manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); - block1Stream.close(); - assertEquals(sortBlock1, block1); - } - - @Test - public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); - manager.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); - - InputStream block0Stream = - manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); - block0Stream.close(); - assertEquals(hashBlock0, block0); - - InputStream block1Stream = - manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); - block1Stream.close(); - assertEquals(hashBlock1, block1); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org