[SPARK-2288] Hide ShuffleBlockManager behind ShuffleManager

By Hiding the shuffleblockmanager behind Shufflemanager, we decouple the 
shuffle data's block mapping management work from Diskblockmananger. This give 
a more clear interface and more easy for other shuffle manager to implement 
their own block management logic. the jira ticket have more details.

Author: Raymond Liu <raymond....@intel.com>

Closes #1241 from colorant/shuffle and squashes the following commits:

0e01ae3 [Raymond Liu] Move ShuffleBlockmanager behind shuffleManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acea9280
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acea9280
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acea9280

Branch: refs/heads/master
Commit: acea92806c91535162a9fdcb1cce579e7f1f91c7
Parents: 7e662af
Author: Raymond Liu <raymond....@intel.com>
Authored: Fri Aug 29 23:05:18 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Fri Aug 29 23:05:18 2014 -0700

----------------------------------------------------------------------
 .../spark/shuffle/FileShuffleBlockManager.scala | 296 ++++++++++++++++++
 .../shuffle/IndexShuffleBlockManager.scala      | 121 ++++++++
 .../spark/shuffle/ShuffleBlockManager.scala     |  38 +++
 .../apache/spark/shuffle/ShuffleManager.scala   |   9 +-
 .../spark/shuffle/hash/HashShuffleManager.scala |  18 +-
 .../spark/shuffle/hash/HashShuffleWriter.scala  |  10 +-
 .../spark/shuffle/sort/SortShuffleManager.scala |  49 +--
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  32 +-
 .../spark/storage/BlockFetcherIterator.scala    |   2 +-
 .../org/apache/spark/storage/BlockId.scala      |   8 +
 .../org/apache/spark/storage/BlockManager.scala |  22 +-
 .../spark/storage/BlockManagerSlaveActor.scala  |   4 +-
 .../apache/spark/storage/DiskBlockManager.scala |  34 +--
 .../org/apache/spark/storage/DiskStore.scala    |  41 ++-
 .../spark/storage/ShuffleBlockManager.scala     | 303 -------------------
 .../spark/storage/TachyonBlockManager.scala     |   4 +-
 .../spark/util/collection/ExternalSorter.scala  |  38 +--
 .../shuffle/hash/HashShuffleManagerSuite.scala  | 111 +++++++
 .../storage/BlockFetcherIteratorSuite.scala     |  36 +--
 .../spark/storage/BlockManagerSuite.scala       |   7 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 143 +--------
 project/MimaExcludes.scala                      |   2 +
 .../apache/spark/tools/StoragePerfTester.scala  |   9 +-
 23 files changed, 725 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
new file mode 100644
index 0000000..76e3932
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.{SparkEnv, SparkConf, Logging}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.storage._
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, 
TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, 
PrimitiveVector}
+
+/** A group of writers for a ShuffleMapTask, one writer per reducer. */
+private[spark] trait ShuffleWriterGroup {
+  val writers: Array[BlockObjectWriter]
+
+  /** @param success Indicates all writes were successful. If false, no blocks 
will be recorded. */
+  def releaseWriters(success: Boolean)
+}
+
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one file
+ * per reducer (this set of files is called a ShuffleFileGroup).
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, 
multiple shuffle
+ * blocks are aggregated into the same file. There is one "combined shuffle 
file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing 
to its shuffle
+ * files, it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified 
by a 3-tuple:
+ *   - shuffleId: The unique id given to the entire shuffle stage.
+ *   - bucketId: The id of the output partition (i.e., reducer id)
+ *   - fileId: The unique id identifying a group of "combined shuffle files." 
Only one task at a
+ *       time owns a particular fileId, and this id is returned to a pool when 
the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple 
(file, offset, length)
+ * that specifies where in a given file the actual block data is located.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than 
simply mapping
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a 
list of offsets for
+ * each block stored in each file. In order to find the location of a shuffle 
block, we search the
+ * files within a ShuffleFileGroups associated with the block's reducer.
+ */
+
+private[spark]
+class FileShuffleBlockManager(conf: SparkConf)
+  extends ShuffleBlockManager with Logging {
+
+  private lazy val blockManager = SparkEnv.get.blockManager
+
+  // Turning off shuffle file consolidation causes all shuffle Blocks to get 
their own file.
+  // TODO: Remove this once the shuffle file consolidation feature is stable.
+  private val consolidateShuffleFiles =
+    conf.getBoolean("spark.shuffle.consolidateFiles", false)
+
+  private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 
1024
+
+  /**
+   * Contains all the state related to a particular shuffle. This includes a 
pool of unused
+   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been 
created for the shuffle.
+   */
+  private class ShuffleState(val numBuckets: Int) {
+    val nextFileId = new AtomicInteger(0)
+    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+    /**
+     * The mapIds of all map tasks completed on this Executor for this shuffle.
+     * NB: This is only populated if consolidateShuffleFiles is FALSE. We 
don't need it otherwise.
+     */
+    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
+  }
+
+  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
+
+  private val metadataCleaner =
+    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, 
this.cleanup, conf)
+
+  /**
+   * Get a ShuffleWriterGroup for the given map task, which will register it 
as complete
+   * when the writers are closed successfully
+   */
+  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: 
Serializer,
+      writeMetrics: ShuffleWriteMetrics) = {
+    new ShuffleWriterGroup {
+      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
+      private val shuffleState = shuffleStates(shuffleId)
+      private var fileGroup: ShuffleFileGroup = null
+
+      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
+        fileGroup = getUnusedFileGroup()
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, 
bufferSize,
+            writeMetrics)
+        }
+      } else {
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          // Because of previous failures, the shuffle file may already exist 
on this machine.
+          // If so, remove it.
+          if (blockFile.exists) {
+            if (blockFile.delete()) {
+              logInfo(s"Removed existing shuffle file $blockFile")
+            } else {
+              logWarning(s"Failed to remove existing shuffle file $blockFile")
+            }
+          }
+          blockManager.getDiskWriter(blockId, blockFile, serializer, 
bufferSize, writeMetrics)
+        }
+      }
+
+      override def releaseWriters(success: Boolean) {
+        if (consolidateShuffleFiles) {
+          if (success) {
+            val offsets = writers.map(_.fileSegment().offset)
+            val lengths = writers.map(_.fileSegment().length)
+            fileGroup.recordMapOutput(mapId, offsets, lengths)
+          }
+          recycleFileGroup(fileGroup)
+        } else {
+          shuffleState.completedMapTasks.add(mapId)
+        }
+      }
+
+      private def getUnusedFileGroup(): ShuffleFileGroup = {
+        val fileGroup = shuffleState.unusedFileGroups.poll()
+        if (fileGroup != null) fileGroup else newFileGroup()
+      }
+
+      private def newFileGroup(): ShuffleFileGroup = {
+        val fileId = shuffleState.nextFileId.getAndIncrement()
+        val files = Array.tabulate[File](numBuckets) { bucketId =>
+          val filename = physicalFileName(shuffleId, bucketId, fileId)
+          blockManager.diskBlockManager.getFile(filename)
+        }
+        val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
+        shuffleState.allFileGroups.add(fileGroup)
+        fileGroup
+      }
+
+      private def recycleFileGroup(group: ShuffleFileGroup) {
+        shuffleState.unusedFileGroups.add(group)
+      }
+    }
+  }
+
+  /**
+   * Returns the physical file segment in which the given BlockId is located.
+   */
+  private def getBlockLocation(id: ShuffleBlockId): FileSegment = {
+    if (consolidateShuffleFiles) {
+      // Search all file groups associated with this shuffle.
+      val shuffleState = shuffleStates(id.shuffleId)
+      val iter = shuffleState.allFileGroups.iterator
+      while (iter.hasNext) {
+        val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId)
+        if (segment.isDefined) { return segment.get }
+      }
+      throw new IllegalStateException("Failed to find shuffle block: " + id)
+    } else {
+      val file = blockManager.diskBlockManager.getFile(id)
+      new FileSegment(file, 0, file.length())
+    }
+  }
+
+  override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
+    val segment = getBlockLocation(blockId)
+    blockManager.diskStore.getBytes(segment)
+  }
+
+  override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, 
ByteBuffer] = {
+    Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
+  }
+
+  /** Remove all the blocks / files and metadata related to a particular 
shuffle. */
+  def removeShuffle(shuffleId: ShuffleId): Boolean = {
+    // Do not change the ordering of this, if shuffleStates should be removed 
only
+    // after the corresponding shuffle blocks have been removed
+    val cleaned = removeShuffleBlocks(shuffleId)
+    shuffleStates.remove(shuffleId)
+    cleaned
+  }
+
+  /** Remove all the blocks / files related to a particular shuffle. */
+  private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
+    shuffleStates.get(shuffleId) match {
+      case Some(state) =>
+        if (consolidateShuffleFiles) {
+          for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+            file.delete()
+          }
+        } else {
+          for (mapId <- state.completedMapTasks; reduceId <- 0 until 
state.numBuckets) {
+            val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+            blockManager.diskBlockManager.getFile(blockId).delete()
+          }
+        }
+        logInfo("Deleted all files for shuffle " + shuffleId)
+        true
+      case None =>
+        logInfo("Could not find files for shuffle " + shuffleId + " for 
deleting")
+        false
+    }
+  }
+
+  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
+    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
+  }
+
+  private def cleanup(cleanupTime: Long) {
+    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => 
removeShuffleBlocks(shuffleId))
+  }
+
+  override def stop() {
+    metadataCleaner.cancel()
+  }
+}
+
+private[spark]
+object FileShuffleBlockManager {
+  /**
+   * A group of shuffle files, one per reducer.
+   * A particular mapper will be assigned a single ShuffleFileGroup to write 
its output to.
+   */
+  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val 
files: Array[File]) {
+    private var numBlocks: Int = 0
+
+    /**
+     * Stores the absolute index of each mapId in the files of this group. For 
instance,
+     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
+     */
+    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
+
+    /**
+     * Stores consecutive offsets and lengths of blocks into each reducer 
file, ordered by
+     * position in the file.
+     * Note: mapIdToIndex(mapId) returns the index of the mapper into the 
vector for every
+     * reducer.
+     */
+    private val blockOffsetsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
+    private val blockLengthsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
+
+    def apply(bucketId: Int) = files(bucketId)
+
+    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: 
Array[Long]) {
+      assert(offsets.length == lengths.length)
+      mapIdToIndex(mapId) = numBlocks
+      numBlocks += 1
+      for (i <- 0 until offsets.length) {
+        blockOffsetsByReducer(i) += offsets(i)
+        blockLengthsByReducer(i) += lengths(i)
+      }
+    }
+
+    /** Returns the FileSegment associated with the given map task, or None if 
no entry exists. */
+    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
+      val file = files(reducerId)
+      val blockOffsets = blockOffsetsByReducer(reducerId)
+      val blockLengths = blockLengthsByReducer(reducerId)
+      val index = mapIdToIndex.getOrElse(mapId, -1)
+      if (index >= 0) {
+        val offset = blockOffsets(index)
+        val length = blockLengths(index)
+        Some(new FileSegment(file, offset, length))
+      } else {
+        None
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
new file mode 100644
index 0000000..8bb9efc
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io._
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.storage._
+
+/**
+ * Create and maintain the shuffle blocks' mapping between logic block and 
physical file location.
+ * Data of shuffle blocks from the same map task are stored in a single 
consolidated data file.
+ * The offsets of the data blocks in the data file are stored in a separate 
index file.
+ *
+ * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 
0 and add ".data"
+ * as the filename postfix for data file, and ".index" as the filename postfix 
for index file.
+ *
+ */
+private[spark]
+class IndexShuffleBlockManager extends ShuffleBlockManager {
+
+  private lazy val blockManager = SparkEnv.get.blockManager
+
+  /**
+   * Mapping to a single shuffleBlockId with reduce ID 0.
+   * */
+  def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
+    ShuffleBlockId(shuffleId, mapId, 0)
+  }
+
+  def getDataFile(shuffleId: Int, mapId: Int): File = {
+    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 
0))
+  }
+
+  private def getIndexFile(shuffleId: Int, mapId: Int): File = {
+    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, 
mapId, 0))
+  }
+
+  /**
+   * Remove data file and index file that contain the output data from one map.
+   * */
+  def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
+    var file = getDataFile(shuffleId, mapId)
+    if (file.exists()) {
+      file.delete()
+    }
+
+    file = getIndexFile(shuffleId, mapId)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+
+  /**
+   * Write an index file with the offsets of each block, plus a final offset 
at the end for the
+   * end of the output file. This will be used by getBlockLocation to figure 
out where each block
+   * begins and ends.
+   * */
+  def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = {
+    val indexFile = getIndexFile(shuffleId, mapId)
+    val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexFile)))
+    try {
+      // We take in lengths of each block, need to convert it to offsets.
+      var offset = 0L
+      out.writeLong(offset)
+
+      for (length <- lengths) {
+        offset += length
+        out.writeLong(offset)
+      }
+    } finally {
+      out.close()
+    }
+  }
+
+  /**
+   * Get the location of a block in a map output file. Uses the index file we 
create for it.
+   * */
+  private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = {
+    // The block is actually going to be a range of a single map output file 
for this map, so
+    // find out the consolidated file, then the offset within that from our 
index
+    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
+
+    val in = new DataInputStream(new FileInputStream(indexFile))
+    try {
+      in.skip(blockId.reduceId * 8)
+      val offset = in.readLong()
+      val nextOffset = in.readLong()
+      new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, 
nextOffset - offset)
+    } finally {
+      in.close()
+    }
+  }
+
+  override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
+    val segment = getBlockLocation(blockId)
+    blockManager.diskStore.getBytes(segment)
+  }
+
+  override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, 
ByteBuffer] = {
+    Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
+  }
+
+  override def stop() = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
new file mode 100644
index 0000000..4240580
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.storage.{FileSegment, ShuffleBlockId}
+
+private[spark]
+trait ShuffleBlockManager {
+  type ShuffleId = Int
+
+  /**
+   * Get shuffle block data managed by the local ShuffleBlockManager.
+   * @return Some(ByteBuffer) if block found, otherwise None.
+   */
+  def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
+
+  def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer]
+
+  def stop(): Unit
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 9c859b8..801ae54 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -49,8 +49,13 @@ private[spark] trait ShuffleManager {
       endPartition: Int,
       context: TaskContext): ShuffleReader[K, C]
 
-  /** Remove a shuffle's metadata from the ShuffleManager. */
-  def unregisterShuffle(shuffleId: Int)
+  /**
+    * Remove a shuffle's metadata from the ShuffleManager.
+    * @return true if the metadata removed successfully, otherwise false.
+    */
+  def unregisterShuffle(shuffleId: Int): Boolean
+
+  def shuffleBlockManager: ShuffleBlockManager
 
   /** Shut down this ShuffleManager. */
   def stop(): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index df98d18..62e0629 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -25,6 +25,9 @@ import org.apache.spark.shuffle._
  * mapper (possibly reusing these across waves of tasks).
  */
 private[spark] class HashShuffleManager(conf: SparkConf) extends 
ShuffleManager {
+
+  private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+
   /* Register a shuffle with the manager and obtain a handle for it to pass to 
tasks. */
   override def registerShuffle[K, V, C](
       shuffleId: Int,
@@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) 
extends ShuffleManager
   /** Get a writer for a given partition. Called on executors by map tasks. */
   override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
       : ShuffleWriter[K, V] = {
-    new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+    new HashShuffleWriter(
+      shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
   }
 
   /** Remove a shuffle's metadata from the ShuffleManager. */
-  override def unregisterShuffle(shuffleId: Int): Unit = {}
+  override def unregisterShuffle(shuffleId: Int): Boolean = {
+    shuffleBlockManager.removeShuffle(shuffleId)
+  }
+
+  override def shuffleBlockManager: FileShuffleBlockManager = {
+    fileShuffleBlockManager
+  }
 
   /** Shut down this ShuffleManager. */
-  override def stop(): Unit = {}
+  override def stop(): Unit = {
+    shuffleBlockManager.stop()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 51e454d..4b9454d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.shuffle.hash
 
-import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
-import org.apache.spark.{Logging, MapOutputTracker, SparkEnv, TaskContext}
-import org.apache.spark.storage.{BlockObjectWriter}
-import org.apache.spark.serializer.Serializer
+import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle._
+import org.apache.spark.storage.BlockObjectWriter
 
 private[spark] class HashShuffleWriter[K, V](
+    shuffleBlockManager: FileShuffleBlockManager,
     handle: BaseShuffleHandle[K, V, _],
     mapId: Int,
     context: TaskContext)
@@ -43,7 +44,6 @@ private[spark] class HashShuffleWriter[K, V](
   metrics.shuffleWriteMetrics = Some(writeMetrics)
 
   private val blockManager = SparkEnv.get.blockManager
-  private val shuffleBlockManager = blockManager.shuffleBlockManager
   private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
   private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, 
numOutputSplits, ser,
     writeMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 6dcca47..b727438 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.shuffle.sort
 
-import java.io.{DataInputStream, FileInputStream}
+import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency}
 import org.apache.spark.shuffle._
-import org.apache.spark.{TaskContext, ShuffleDependency}
 import org.apache.spark.shuffle.hash.HashShuffleReader
-import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId}
 
-private[spark] class SortShuffleManager extends ShuffleManager {
+private[spark] class SortShuffleManager(conf: SparkConf) extends 
ShuffleManager {
+
+  private val indexShuffleBlockManager = new IndexShuffleBlockManager()
+  private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
+
   /**
    * Register a shuffle with the manager and obtain a handle for it to pass to 
tasks.
    */
@@ -52,29 +55,29 @@ private[spark] class SortShuffleManager extends 
ShuffleManager {
   /** Get a writer for a given partition. Called on executors by map tasks. */
   override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
       : ShuffleWriter[K, V] = {
-    new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
+    shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, 
baseShuffleHandle.numMaps)
+    new SortShuffleWriter(
+      shuffleBlockManager, baseShuffleHandle, mapId, context)
   }
 
   /** Remove a shuffle's metadata from the ShuffleManager. */
-  override def unregisterShuffle(shuffleId: Int): Unit = {}
+  override def unregisterShuffle(shuffleId: Int): Boolean = {
+    if (shuffleMapNumber.containsKey(shuffleId)) {
+      val numMaps = shuffleMapNumber.remove(shuffleId)
+      (0 until numMaps).map{ mapId =>
+        shuffleBlockManager.removeDataByMap(shuffleId, mapId)
+      }
+    }
+    true
+  }
 
-  /** Shut down this ShuffleManager. */
-  override def stop(): Unit = {}
+  override def shuffleBlockManager: IndexShuffleBlockManager = {
+    indexShuffleBlockManager
+  }
 
-  /** Get the location of a block in a map output file. Uses the index file we 
create for it. */
-  def getBlockLocation(blockId: ShuffleBlockId, diskManager: 
DiskBlockManager): FileSegment = {
-    // The block is actually going to be a range of a single map output file 
for this map, so
-    // figure out the ID of the consolidated file, then the offset within that 
from our index
-    val consolidatedId = blockId.copy(reduceId = 0)
-    val indexFile = diskManager.getFile(consolidatedId.name + ".index")
-    val in = new DataInputStream(new FileInputStream(indexFile))
-    try {
-      in.skip(blockId.reduceId * 8)
-      val offset = in.readLong()
-      val nextOffset = in.readLong()
-      new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset 
- offset)
-    } finally {
-      in.close()
-    }
+  /** Shut down this ShuffleManager. */
+  override def stop(): Unit = {
+    shuffleBlockManager.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index b8c9ad4..89a78d6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -17,29 +17,25 @@
 
 package org.apache.spark.shuffle.sort
 
-import java.io.File
-
 import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, 
BaseShuffleHandle}
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
 
 private[spark] class SortShuffleWriter[K, V, C](
+    shuffleBlockManager: IndexShuffleBlockManager,
     handle: BaseShuffleHandle[K, V, C],
     mapId: Int,
     context: TaskContext)
   extends ShuffleWriter[K, V] with Logging {
 
   private val dep = handle.dependency
-  private val numPartitions = dep.partitioner.numPartitions
 
   private val blockManager = SparkEnv.get.blockManager
 
   private var sorter: ExternalSorter[K, V, _] = null
-  private var outputFile: File = null
-  private var indexFile: File = null
 
   // Are we in the process of stopping? Because map tasks can call stop() with 
success = true
   // and then call stop() with success = false if they get an exception, we 
want to make sure
@@ -69,17 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
       sorter.insertAll(records)
     }
 
-    // Create a single shuffle file with reduce ID 0 that we'll write all 
results to. We'll later
-    // serve different ranges of this file using an index file that we create 
at the end.
-    val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
-
-    outputFile = blockManager.diskBlockManager.getFile(blockId)
-    indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")
-
-    val partitionLengths = sorter.writePartitionedFile(blockId, context)
-
-    // Register our map output with the ShuffleBlockManager, which handles 
cleaning it over time
-    blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, 
numPartitions)
+    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
+    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
+    val partitionLengths = sorter.writePartitionedFile(blockId, context, 
outputFile)
+    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
 
     mapStatus = new MapStatus(blockManager.blockManagerId,
       partitionLengths.map(MapOutputTracker.compressSize))
@@ -95,13 +84,8 @@ private[spark] class SortShuffleWriter[K, V, C](
       if (success) {
         return Option(mapStatus)
       } else {
-        // The map task failed, so delete our output file if we created one
-        if (outputFile != null) {
-          outputFile.delete()
-        }
-        if (indexFile != null) {
-          indexFile.delete()
-        }
+        // The map task failed, so delete our output data.
+        shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
         return None
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index d07e6a1..e35b7fe 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -197,7 +197,7 @@ object BlockFetcherIterator {
       for (id <- localBlocksToFetch) {
         try {
           readMetrics.localBlocksFetched += 1
-          results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, 
serializer).get))
+          results.put(new FetchResult(id, 0, () => getLocalShuffleFromDisk(id, 
serializer).get))
           logDebug("Got local block " + id)
         } catch {
           case e: Exception => {

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c1756ac..a83a3f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -59,6 +59,11 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, 
reduceId: Int) extends Blo
 }
 
 @DeveloperApi
+case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) 
extends BlockId {
+  def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
+}
+
+@DeveloperApi
 case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) 
extends BlockId {
   def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
 }
@@ -92,6 +97,7 @@ private[spark] case class TestBlockId(id: String) extends 
BlockId {
 object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
   val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
+  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
   val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
   val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
   val TASKRESULT = "taskresult_([0-9]+)".r
@@ -104,6 +110,8 @@ object BlockId {
       RDDBlockId(rddId.toInt, splitIndex.toInt)
     case SHUFFLE(shuffleId, mapId, reduceId) =>
       ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+    case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
+      ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
     case SHUFFLE_INDEX(shuffleId, mapId, reduceId) =>
       ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
     case BROADCAST(broadcastId, field) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index cfe5b6c..a714142 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -64,8 +64,8 @@ private[spark] class BlockManager(
   extends BlockDataProvider with Logging {
 
   private val port = conf.getInt("spark.blockManager.port", 0)
-  val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
-  val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
+
+  val diskBlockManager = new DiskBlockManager(this, conf)
   val connectionManager =
     new ConnectionManager(port, conf, securityManager, "Connection manager for 
block manager")
 
@@ -83,7 +83,7 @@ private[spark] class BlockManager(
     val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
     val tachyonMaster = conf.get("spark.tachyonStore.url",  
"tachyon://localhost:19998")
     val tachyonBlockManager =
-      new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, 
tachyonMaster)
+      new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
     tachyonInitialized = true
     new TachyonStore(this, tachyonBlockManager)
   }
@@ -215,7 +215,7 @@ private[spark] class BlockManager(
   override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] 
= {
     val bid = BlockId(blockId)
     if (bid.isShuffle) {
-      Left(diskBlockManager.getBlockLocation(bid))
+      
shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId])
     } else {
       val blockBytesOpt = doGetLocal(bid, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
       if (blockBytesOpt.isDefined) {
@@ -333,8 +333,14 @@ private[spark] class BlockManager(
    * shuffle blocks. It is safe to do so without a lock on block info since 
disk store
    * never deletes (recent) items.
    */
-  def getLocalFromDisk(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
-    diskStore.getValues(blockId, serializer).orElse {
+  def getLocalShuffleFromDisk(
+      blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
+
+    val shuffleBlockManager = shuffleManager.shuffleBlockManager
+    val values = 
shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map(
+      bytes => this.dataDeserialize(blockId, bytes, serializer))
+
+    values.orElse {
       throw new BlockException(blockId, s"Block $blockId not found on disk, 
though it should be")
     }
   }
@@ -355,7 +361,8 @@ private[spark] class BlockManager(
     // As an optimization for map output fetches, if the block is for a 
shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items 
so this should work
     if (blockId.isShuffle) {
-      diskStore.getBytes(blockId) match {
+      val shuffleBlockManager = shuffleManager.shuffleBlockManager
+      shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match 
{
         case Some(bytes) =>
           Some(bytes)
         case None =>
@@ -1045,7 +1052,6 @@ private[spark] class BlockManager(
 
   def stop(): Unit = {
     connectionManager.stop()
-    shuffleBlockManager.stop()
     diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index c194e0f..14ae2f3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -21,7 +21,7 @@ import scala.concurrent.Future
 
 import akka.actor.{ActorRef, Actor}
 
-import org.apache.spark.{Logging, MapOutputTracker}
+import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.ActorLogReceive
 
@@ -55,7 +55,7 @@ class BlockManagerSlaveActor(
         if (mapOutputTracker != null) {
           mapOutputTracker.unregisterShuffle(shuffleId)
         }
-        blockManager.shuffleBlockManager.removeShuffle(shuffleId)
+        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
       }
 
     case RemoveBroadcast(broadcastId, tellMaster) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index ec022ce..a715594 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,11 +21,9 @@ import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Date, Random, UUID}
 
-import org.apache.spark.{SparkConf, SparkEnv, Logging}
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.network.netty.PathResolver
 import org.apache.spark.util.Utils
-import org.apache.spark.shuffle.sort.SortShuffleManager
 
 /**
  * Creates and maintains the logical mapping between logical blocks and 
physical on-disk
@@ -36,13 +34,11 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
  * Block files are hashed among the directories listed in spark.local.dir (or 
in
  * SPARK_LOCAL_DIRS, if it's set).
  */
-private[spark] class DiskBlockManager(shuffleBlockManager: 
ShuffleBlockManager, conf: SparkConf)
-  extends PathResolver with Logging {
+private[spark] class DiskBlockManager(blockManager: BlockManager, conf: 
SparkConf)
+  extends Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-
-  private val subDirsPerLocalDir =
-    shuffleBlockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+  private val subDirsPerLocalDir = 
blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
   /* Create one local directory for each path mentioned in spark.local.dir; 
then, inside this
    * directory, create multiple subdirectories that we will hash files into, 
in order to avoid
@@ -56,26 +52,6 @@ private[spark] class DiskBlockManager(shuffleBlockManager: 
ShuffleBlockManager,
 
   addShutdownHook()
 
-  /**
-   * Returns the physical file segment in which the given BlockId is located. 
If the BlockId has
-   * been mapped to a specific FileSegment by the shuffle layer, that will be 
returned.
-   * Otherwise, we assume the Block is mapped to the whole file identified by 
the BlockId.
-   */
-  def getBlockLocation(blockId: BlockId): FileSegment = {
-    val env = SparkEnv.get  // NOTE: can be null in unit tests
-    if (blockId.isShuffle && env != null && 
env.shuffleManager.isInstanceOf[SortShuffleManager]) {
-      // For sort-based shuffle, let it figure out its blocks
-      val sortShuffleManager = 
env.shuffleManager.asInstanceOf[SortShuffleManager]
-      
sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this)
-    } else if (blockId.isShuffle && 
shuffleBlockManager.consolidateShuffleFiles) {
-      // For hash-based shuffle with consolidated files, ShuffleBlockManager 
takes care of this
-      
shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
-    } else {
-      val file = getFile(blockId.name)
-      new FileSegment(file, 0, file.length())
-    }
-  }
-
   def getFile(filename: String): File = {
     // Figure out which local directory it hashes to, and which subdirectory 
in that
     val hash = Utils.nonNegativeHash(filename)
@@ -105,7 +81,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: 
ShuffleBlockManager,
 
   /** Check if disk block manager has a block. */
   def containsBlock(blockId: BlockId): Boolean = {
-    getBlockLocation(blockId).file.exists()
+    getFile(blockId.name).exists()
   }
 
   /** List all the files currently stored on disk by the disk manager. */

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c83261d..e9304f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
@@ -34,7 +34,7 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
   val minMemoryMapBytes = 
blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
 
   override def getSize(blockId: BlockId): Long = {
-    diskManager.getBlockLocation(blockId).length
+    diskManager.getFile(blockId.name).length
   }
 
   override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): PutResult = {
@@ -89,25 +89,33 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     }
   }
 
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val segment = diskManager.getBlockLocation(blockId)
-    val channel = new RandomAccessFile(segment.file, "r").getChannel
+  private def getBytes(file: File, offset: Long, length: Long): 
Option[ByteBuffer] = {
+    val channel = new RandomAccessFile(file, "r").getChannel
 
     try {
       // For small files, directly read rather than memory map
-      if (segment.length < minMemoryMapBytes) {
-        val buf = ByteBuffer.allocate(segment.length.toInt)
-        channel.read(buf, segment.offset)
+      if (length < minMemoryMapBytes) {
+        val buf = ByteBuffer.allocate(length.toInt)
+        channel.read(buf, offset)
         buf.flip()
         Some(buf)
       } else {
-        Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
+        Some(channel.map(MapMode.READ_ONLY, offset, length))
       }
     } finally {
       channel.close()
     }
   }
 
+  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+    val file = diskManager.getFile(blockId.name)
+    getBytes(file, 0, file.length)
+  }
+
+  def getBytes(segment: FileSegment): Option[ByteBuffer] = {
+    getBytes(segment.file, segment.offset, segment.length)
+  }
+
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
   }
@@ -117,24 +125,25 @@ private[spark] class DiskStore(blockManager: 
BlockManager, diskManager: DiskBloc
    * shuffle short-circuit code.
    */
   def getValues(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
+    // TODO: Should bypass getBytes and use a stream based implementation, so 
that
+    // we won't use a lot of memory during e.g. external sort merge.
     getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, 
bytes, serializer))
   }
 
   override def remove(blockId: BlockId): Boolean = {
-    val fileSegment = diskManager.getBlockLocation(blockId)
-    val file = fileSegment.file
-    if (file.exists() && file.length() == fileSegment.length) {
+    val file = diskManager.getFile(blockId.name)
+    // If consolidation mode is used With HashShuffleMananger, the physical 
filename for the block
+    // is different from blockId.name. So the file returns here will not be 
exist, thus we avoid to
+    // delete the whole consolidated file by mistake.
+    if (file.exists()) {
       file.delete()
     } else {
-      if (fileSegment.length < file.length()) {
-        logWarning(s"Could not delete block associated with only a part of a 
file: $blockId")
-      }
       false
     }
   }
 
   override def contains(blockId: BlockId): Boolean = {
-    val file = diskManager.getBlockLocation(blockId).file
+    val file = diskManager.getFile(blockId.name)
     file.exists()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
deleted file mode 100644
index b8f5d3a..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.File
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConversions._
-
-import org.apache.spark.Logging
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.ShuffleManager
-import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, 
TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, 
PrimitiveVector}
-import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.executor.ShuffleWriteMetrics
-
-/** A group of writers for a ShuffleMapTask, one writer per reducer. */
-private[spark] trait ShuffleWriterGroup {
-  val writers: Array[BlockObjectWriter]
-
-  /** @param success Indicates all writes were successful. If false, no blocks 
will be recorded. */
-  def releaseWriters(success: Boolean)
-}
-
-/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one file
- * per reducer (this set of files is called a ShuffleFileGroup).
- *
- * As an optimization to reduce the number of physical shuffle files produced, 
multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle 
file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing 
to its shuffle
- * files, it releases them for another task.
- * Regarding the implementation of this feature, shuffle files are identified 
by a 3-tuple:
- *   - shuffleId: The unique id given to the entire shuffle stage.
- *   - bucketId: The id of the output partition (i.e., reducer id)
- *   - fileId: The unique id identifying a group of "combined shuffle files." 
Only one task at a
- *       time owns a particular fileId, and this id is returned to a pool when 
the task finishes.
- * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple 
(file, offset, length)
- * that specifies where in a given file the actual block data is located.
- *
- * Shuffle file metadata is stored in a space-efficient manner. Rather than 
simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a 
list of offsets for
- * each block stored in each file. In order to find the location of a shuffle 
block, we search the
- * files within a ShuffleFileGroups associated with the block's reducer.
- */
-// TODO: Factor this into a separate class for each ShuffleManager 
implementation
-private[spark]
-class ShuffleBlockManager(blockManager: BlockManager,
-                          shuffleManager: ShuffleManager) extends Logging {
-  def conf = blockManager.conf
-
-  // Turning off shuffle file consolidation causes all shuffle Blocks to get 
their own file.
-  // TODO: Remove this once the shuffle file consolidation feature is stable.
-  val consolidateShuffleFiles =
-    conf.getBoolean("spark.shuffle.consolidateFiles", false)
-
-  // Are we using sort-based shuffle?
-  val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager]
-
-  private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 
1024
-
-  /**
-   * Contains all the state related to a particular shuffle. This includes a 
pool of unused
-   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been 
created for the shuffle.
-   */
-  private class ShuffleState(val numBuckets: Int) {
-    val nextFileId = new AtomicInteger(0)
-    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
-    /**
-     * The mapIds of all map tasks completed on this Executor for this shuffle.
-     * NB: This is only populated if consolidateShuffleFiles is FALSE. We 
don't need it otherwise.
-     */
-    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
-  }
-
-  type ShuffleId = Int
-  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
-
-  private val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, 
this.cleanup, conf)
-
-  /**
-   * Register a completed map without getting a ShuffleWriterGroup. Used by 
sort-based shuffle
-   * because it just writes a single file by itself.
-   */
-  def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
-    shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
-    val shuffleState = shuffleStates(shuffleId)
-    shuffleState.completedMapTasks.add(mapId)
-  }
-
-  /**
-   * Get a ShuffleWriterGroup for the given map task, which will register it 
as complete
-   * when the writers are closed successfully
-   */
-  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: 
Serializer,
-      writeMetrics: ShuffleWriteMetrics) = {
-    new ShuffleWriterGroup {
-      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
-      private val shuffleState = shuffleStates(shuffleId)
-      private var fileGroup: ShuffleFileGroup = null
-
-      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
-        fileGroup = getUnusedFileGroup()
-        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, 
bufferSize,
-            writeMetrics)
-        }
-      } else {
-        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          val blockFile = blockManager.diskBlockManager.getFile(blockId)
-          // Because of previous failures, the shuffle file may already exist 
on this machine.
-          // If so, remove it.
-          if (blockFile.exists) {
-            if (blockFile.delete()) {
-              logInfo(s"Removed existing shuffle file $blockFile")
-            } else {
-              logWarning(s"Failed to remove existing shuffle file $blockFile")
-            }
-          }
-          blockManager.getDiskWriter(blockId, blockFile, serializer, 
bufferSize, writeMetrics)
-        }
-      }
-
-      override def releaseWriters(success: Boolean) {
-        if (consolidateShuffleFiles) {
-          if (success) {
-            val offsets = writers.map(_.fileSegment().offset)
-            val lengths = writers.map(_.fileSegment().length)
-            fileGroup.recordMapOutput(mapId, offsets, lengths)
-          }
-          recycleFileGroup(fileGroup)
-        } else {
-          shuffleState.completedMapTasks.add(mapId)
-        }
-      }
-
-      private def getUnusedFileGroup(): ShuffleFileGroup = {
-        val fileGroup = shuffleState.unusedFileGroups.poll()
-        if (fileGroup != null) fileGroup else newFileGroup()
-      }
-
-      private def newFileGroup(): ShuffleFileGroup = {
-        val fileId = shuffleState.nextFileId.getAndIncrement()
-        val files = Array.tabulate[File](numBuckets) { bucketId =>
-          val filename = physicalFileName(shuffleId, bucketId, fileId)
-          blockManager.diskBlockManager.getFile(filename)
-        }
-        val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
-        shuffleState.allFileGroups.add(fileGroup)
-        fileGroup
-      }
-
-      private def recycleFileGroup(group: ShuffleFileGroup) {
-        shuffleState.unusedFileGroups.add(group)
-      }
-    }
-  }
-
-  /**
-   * Returns the physical file segment in which the given BlockId is located.
-   * This function should only be called if shuffle file consolidation is 
enabled, as it is
-   * an error condition if we don't find the expected block.
-   */
-  def getBlockLocation(id: ShuffleBlockId): FileSegment = {
-    // Search all file groups associated with this shuffle.
-    val shuffleState = shuffleStates(id.shuffleId)
-    for (fileGroup <- shuffleState.allFileGroups) {
-      val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
-      if (segment.isDefined) { return segment.get }
-    }
-    throw new IllegalStateException("Failed to find shuffle block: " + id)
-  }
-
-  /** Remove all the blocks / files and metadata related to a particular 
shuffle. */
-  def removeShuffle(shuffleId: ShuffleId): Boolean = {
-    // Do not change the ordering of this, if shuffleStates should be removed 
only
-    // after the corresponding shuffle blocks have been removed
-    val cleaned = removeShuffleBlocks(shuffleId)
-    shuffleStates.remove(shuffleId)
-    cleaned
-  }
-
-  /** Remove all the blocks / files related to a particular shuffle. */
-  private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
-    shuffleStates.get(shuffleId) match {
-      case Some(state) =>
-        if (sortBasedShuffle) {
-          // There's a single block ID for each map, plus an index file for it
-          for (mapId <- state.completedMapTasks) {
-            val blockId = new ShuffleBlockId(shuffleId, mapId, 0)
-            blockManager.diskBlockManager.getFile(blockId).delete()
-            blockManager.diskBlockManager.getFile(blockId.name + 
".index").delete()
-          }
-        } else if (consolidateShuffleFiles) {
-          for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
-            file.delete()
-          }
-        } else {
-          for (mapId <- state.completedMapTasks; reduceId <- 0 until 
state.numBuckets) {
-            val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
-            blockManager.diskBlockManager.getFile(blockId).delete()
-          }
-        }
-        logInfo("Deleted all files for shuffle " + shuffleId)
-        true
-      case None =>
-        logInfo("Could not find files for shuffle " + shuffleId + " for 
deleting")
-        false
-    }
-  }
-
-  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
-    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
-  }
-
-  private def cleanup(cleanupTime: Long) {
-    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => 
removeShuffleBlocks(shuffleId))
-  }
-
-  def stop() {
-    metadataCleaner.cancel()
-  }
-}
-
-private[spark]
-object ShuffleBlockManager {
-  /**
-   * A group of shuffle files, one per reducer.
-   * A particular mapper will be assigned a single ShuffleFileGroup to write 
its output to.
-   */
-  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val 
files: Array[File]) {
-    private var numBlocks: Int = 0
-
-    /**
-     * Stores the absolute index of each mapId in the files of this group. For 
instance,
-     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
-     */
-    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
-    /**
-     * Stores consecutive offsets and lengths of blocks into each reducer 
file, ordered by
-     * position in the file.
-     * Note: mapIdToIndex(mapId) returns the index of the mapper into the 
vector for every
-     * reducer.
-     */
-    private val blockOffsetsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-    private val blockLengthsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-
-    def apply(bucketId: Int) = files(bucketId)
-
-    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: 
Array[Long]) {
-      assert(offsets.length == lengths.length)
-      mapIdToIndex(mapId) = numBlocks
-      numBlocks += 1
-      for (i <- 0 until offsets.length) {
-        blockOffsetsByReducer(i) += offsets(i)
-        blockLengthsByReducer(i) += lengths(i)
-      }
-    }
-
-    /** Returns the FileSegment associated with the given map task, or None if 
no entry exists. */
-    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
-      val file = files(reducerId)
-      val blockOffsets = blockOffsetsByReducer(reducerId)
-      val blockLengths = blockLengthsByReducer(reducerId)
-      val index = mapIdToIndex.getOrElse(mapId, -1)
-      if (index >= 0) {
-        val offset = blockOffsets(index)
-        val length = blockLengths(index)
-        Some(new FileSegment(file, offset, length))
-      } else {
-        None
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index a6cbe3a..6908a59 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.Utils
  * @param rootDirs The directories to use for storing block files. Data will 
be hashed among these.
  */
 private[spark] class TachyonBlockManager(
-    shuffleManager: ShuffleBlockManager,
+    blockManager: BlockManager,
     rootDirs: String,
     val master: String)
   extends Logging {
@@ -49,7 +49,7 @@ private[spark] class TachyonBlockManager(
 
   private val MAX_DIR_CREATION_ATTEMPTS = 10
   private val subDirsPerTachyonDir =
-    shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
+    blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
 
   // Create one Tachyon directory for each path mentioned in 
spark.tachyonStore.folderName;
   // then, inside this directory, create multiple subdirectories that we will 
hash files into,

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 5d8a648..782b979 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -719,20 +719,20 @@ private[spark] class ExternalSorter[K, V, C](
   def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => 
pair._2)
 
   /**
-   * Write all the data added into this ExternalSorter into a file in the disk 
store, creating
-   * an .index file for it as well with the offsets of each partition. This is 
called by the
-   * SortShuffleWriter and can go through an efficient path of just 
concatenating binary files
-   * if we decided to avoid merge-sorting.
+   * Write all the data added into this ExternalSorter into a file in the disk 
store. This is
+   * called by the SortShuffleWriter and can go through an efficient path of 
just concatenating
+   * binary files if we decided to avoid merge-sorting.
    *
    * @param blockId block ID to write to. The index file will be blockId.name 
+ ".index".
    * @param context a TaskContext for a running Spark task, for us to update 
shuffle metrics.
    * @return array of lengths, in bytes, of each partition of the file (used 
by map output tracker)
    */
-  def writePartitionedFile(blockId: BlockId, context: TaskContext): 
Array[Long] = {
-    val outputFile = blockManager.diskBlockManager.getFile(blockId)
+  def writePartitionedFile(
+      blockId: BlockId,
+      context: TaskContext,
+      outputFile: File): Array[Long] = {
 
     // Track location of each range in the output file
-    val offsets = new Array[Long](numPartitions + 1)
     val lengths = new Array[Long](numPartitions)
 
     if (bypassMergeSort && partitionWriters != null) {
@@ -750,7 +750,6 @@ private[spark] class ExternalSorter[K, V, C](
           in.close()
           in = null
           lengths(i) = size
-          offsets(i + 1) = offsets(i) + lengths(i)
         }
       } finally {
         if (out != null) {
@@ -772,11 +771,7 @@ private[spark] class ExternalSorter[K, V, C](
           }
           writer.commitAndClose()
           val segment = writer.fileSegment()
-          offsets(id + 1) = segment.offset + segment.length
           lengths(id) = segment.length
-        } else {
-          // The partition is empty; don't create a new writer to avoid 
writing headers, etc
-          offsets(id + 1) = offsets(id)
         }
       }
     }
@@ -784,23 +779,6 @@ private[spark] class ExternalSorter[K, V, C](
     context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
     context.taskMetrics.diskBytesSpilled += diskBytesSpilled
 
-    // Write an index file with the offsets of each block, plus a final offset 
at the end for the
-    // end of the output file. This will be used by 
SortShuffleManager.getBlockLocation to figure
-    // out where each block begins and ends.
-
-    val diskBlockManager = blockManager.diskBlockManager
-    val indexFile = diskBlockManager.getFile(blockId.name + ".index")
-    val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexFile)))
-    try {
-      var i = 0
-      while (i < numPartitions + 1) {
-        out.writeLong(offsets(i))
-        i += 1
-      }
-    } finally {
-      out.close()
-    }
-
     lengths
   }
 
@@ -811,7 +789,7 @@ private[spark] class ExternalSorter[K, V, C](
     if (writer.isOpen) {
       writer.commitAndClose()
     }
-    blockManager.getLocalFromDisk(writer.blockId, 
ser).get.asInstanceOf[Iterator[Product2[K, C]]]
+    blockManager.diskStore.getValues(writer.blockId, 
ser).get.asInstanceOf[Iterator[Product2[K, C]]]
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
new file mode 100644
index 0000000..6061e54
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.hash
+
+import java.io.{File, FileWriter}
+
+import scala.language.reflectiveCalls
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
+
+class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
+  private val testConf = new SparkConf(false)
+
+  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+    assert (segment1.offset === segment2.offset)
+    assert (segment1.length === segment2.length)
+  }
+
+  test("consolidated shuffle can write to shuffle group without messing 
existing offsets/lengths") {
+
+    val conf = new SparkConf(false)
+    // reset after EACH object write. This is to ensure that there are bytes 
appended after
+    // an object is written. So if the codepaths assume writeObject is end of 
data, this should
+    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+    conf.set("spark.serializer.objectStreamReset", "1")
+    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.hash.HashShuffleManager")
+
+    sc = new SparkContext("local", "test", conf)
+
+    val shuffleBlockManager =
+      
SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
+
+    val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new 
JavaSerializer(conf),
+      new ShuffleWriteMetrics)
+    for (writer <- shuffle1.writers) {
+      writer.write("test1")
+      writer.write("test2")
+    }
+    for (writer <- shuffle1.writers) {
+      writer.commitAndClose()
+    }
+
+    val shuffle1Segment = shuffle1.writers(0).fileSegment()
+    shuffle1.releaseWriters(success = true)
+
+    val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new 
JavaSerializer(conf),
+      new ShuffleWriteMetrics)
+
+    for (writer <- shuffle2.writers) {
+      writer.write("test3")
+      writer.write("test4")
+    }
+    for (writer <- shuffle2.writers) {
+      writer.commitAndClose()
+    }
+    val shuffle2Segment = shuffle2.writers(0).fileSegment()
+    shuffle2.releaseWriters(success = true)
+
+    // Now comes the test :
+    // Write to shuffle 3; and close it, but before registering it, check if 
the file lengths for
+    // previous task (forof shuffle1) is the same as 'segments'. Earlier, we 
were inferring length
+    // of block based on remaining data in file : which could mess things up 
when there is concurrent read
+    // and writes happening to the same shuffle group.
+
+    val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new 
JavaSerializer(testConf),
+      new ShuffleWriteMetrics)
+    for (writer <- shuffle3.writers) {
+      writer.write("test3")
+      writer.write("test4")
+    }
+    for (writer <- shuffle3.writers) {
+      writer.commitAndClose()
+    }
+    // check before we register.
+    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+    shuffle3.releaseWriters(success = true)
+    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+    shuffleBlockManager.removeShuffle(1)
+
+  }
+
+
+  def writeToFile(file: File, numBytes: Int) {
+    val writer = new FileWriter(file, true)
+    for (i <- 0 until numBytes) writer.write(i)
+    writer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index fbfcb51..3c86f6b 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -60,11 +60,11 @@ class BlockFetcherIteratorSuite extends FunSuite with 
Matchers {
     }
 
     // 3rd block is going to fail
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
-    doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
+    doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
 
     val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
@@ -76,24 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with 
Matchers {
 
     iterator.initialize()
 
-    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalFromDisk.
-    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalShuffleFromDisk.
+    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
     // the 2nd element of the tuple returned by iterator.next should be 
defined when fetching successfully
     assert(iterator.next()._2.isDefined, "1st element should be defined but is 
not actually defined")
-    verify(blockManager, times(1)).getLocalFromDisk(any(), any())
+    verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 
1 element")
     assert(iterator.next()._2.isDefined, "2nd element should be defined but is 
not actually defined")
-    verify(blockManager, times(2)).getLocalFromDisk(any(), any())
+    verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 
2 elements")
     // 3rd fetch should be failed
     intercept[Exception] {
       iterator.next()
     }
-    verify(blockManager, times(3)).getLocalFromDisk(any(), any())
+    verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
   }
 
 
@@ -115,11 +115,11 @@ class BlockFetcherIteratorSuite extends FunSuite with 
Matchers {
     val optItr = mock(classOf[Option[Iterator[Any]]])
  
    // All blocks should be fetched successfully
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
-    doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
+    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
 
     val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
@@ -131,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with 
Matchers {
 
     iterator.initialize()
 
-    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalFromDisk.
-    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalShuffleFromDisk.
+    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 1st 
element is not actually defined") 
@@ -145,7 +145,7 @@ class BlockFetcherIteratorSuite extends FunSuite with 
Matchers {
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 
4 elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 5th 
element is not actually defined")
 
-    verify(blockManager, times(5)).getLocalFromDisk(any(), any())
+    verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
   }
 
   test("block fetch from remote fails using BasicBlockFetcherIterator") {

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bdcea07..14ffada 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,6 +49,7 @@ import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.language.postfixOps
+import org.apache.spark.shuffle.ShuffleBlockManager
 
 class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   with PrivateMethodTester {
@@ -823,11 +824,9 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
     // be nice to refactor classes involved in disk storage in a way that
     // allows for easier testing.
     val blockManager = mock(classOf[BlockManager])
-    val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
-    when(shuffleBlockManager.conf).thenReturn(conf)
-    val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
-
     when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+    val diskBlockManager = new DiskBlockManager(blockManager, conf)
+
     val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
     diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
     val mapped = diskStoreMapped.getBytes(blockId).get

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aabaead..26082de 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
 
 import akka.actor.Props
 import com.google.common.io.Files
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.SparkConf
@@ -40,18 +41,8 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
   private var rootDir1: File = _
   private var rootDirs: String = _
 
-  // This suite focuses primarily on consolidation features,
-  // so we coerce consolidation if not already enabled.
-  testConf.set("spark.shuffle.consolidateFiles", "true")
-
-  private val shuffleManager = new HashShuffleManager(testConf.clone)
-
-  val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) {
-    override def conf = testConf.clone
-    var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
-    override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
-  }
-
+  val blockManager = mock(classOf[BlockManager])
+  when(blockManager.conf).thenReturn(testConf)
   var diskBlockManager: DiskBlockManager = _
 
   override def beforeAll() {
@@ -73,22 +64,17 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
   override def beforeEach() {
     val conf = testConf.clone
     conf.set("spark.local.dir", rootDirs)
-    diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
-    shuffleBlockManager.idToSegmentMap.clear()
+    diskBlockManager = new DiskBlockManager(blockManager, conf)
   }
 
   override def afterEach() {
     diskBlockManager.stop()
-    shuffleBlockManager.idToSegmentMap.clear()
   }
 
   test("basic block creation") {
     val blockId = new TestBlockId("test")
-    assertSegmentEquals(blockId, blockId.name, 0, 0)
-
     val newFile = diskBlockManager.getFile(blockId)
     writeToFile(newFile, 10)
-    assertSegmentEquals(blockId, blockId.name, 0, 10)
     assert(diskBlockManager.containsBlock(blockId))
     newFile.delete()
     assert(!diskBlockManager.containsBlock(blockId))
@@ -101,127 +87,6 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
     assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
   }
 
-  test("block appending") {
-    val blockId = new TestBlockId("test")
-    val newFile = diskBlockManager.getFile(blockId)
-    writeToFile(newFile, 15)
-    assertSegmentEquals(blockId, blockId.name, 0, 15)
-    val newFile2 = diskBlockManager.getFile(blockId)
-    assert(newFile === newFile2)
-    writeToFile(newFile2, 12)
-    assertSegmentEquals(blockId, blockId.name, 0, 27)
-    newFile.delete()
-  }
-
-  test("block remapping") {
-    val filename = "test"
-    val blockId0 = new ShuffleBlockId(1, 2, 3)
-    val newFile = diskBlockManager.getFile(filename)
-    writeToFile(newFile, 15)
-    shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 
15)
-    assertSegmentEquals(blockId0, filename, 0, 15)
-
-    val blockId1 = new ShuffleBlockId(1, 2, 4)
-    val newFile2 = diskBlockManager.getFile(filename)
-    writeToFile(newFile2, 12)
-    shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 
15, 12)
-    assertSegmentEquals(blockId1, filename, 15, 12)
-
-    assert(newFile === newFile2)
-    newFile.delete()
-  }
-
-  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
-    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
-    assert (segment1.offset === segment2.offset)
-    assert (segment1.length === segment2.length)
-  }
-
-  test("consolidated shuffle can write to shuffle group without messing 
existing offsets/lengths") {
-
-    val serializer = new JavaSerializer(testConf)
-    val confCopy = testConf.clone
-    // reset after EACH object write. This is to ensure that there are bytes 
appended after
-    // an object is written. So if the codepaths assume writeObject is end of 
data, this should
-    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
-    confCopy.set("spark.serializer.objectStreamReset", "1")
-
-    val securityManager = new org.apache.spark.SecurityManager(confCopy)
-    // Do not use the shuffleBlockManager above !
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, confCopy,
-      securityManager)
-    val master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, 
new LiveListenerBus))),
-      confCopy)
-    val store = new BlockManager("<driver>", actorSystem, master , serializer, 
confCopy,
-      securityManager, null, shuffleManager)
-
-    try {
-
-      val shuffleManager = store.shuffleBlockManager
-
-      val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer, new 
ShuffleWriteMetrics)
-      for (writer <- shuffle1.writers) {
-        writer.write("test1")
-        writer.write("test2")
-      }
-      for (writer <- shuffle1.writers) {
-        writer.commitAndClose()
-      }
-
-      val shuffle1Segment = shuffle1.writers(0).fileSegment()
-      shuffle1.releaseWriters(success = true)
-
-      val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new 
JavaSerializer(testConf),
-        new ShuffleWriteMetrics)
-
-      for (writer <- shuffle2.writers) {
-        writer.write("test3")
-        writer.write("test4")
-      }
-      for (writer <- shuffle2.writers) {
-        writer.commitAndClose()
-      }
-      val shuffle2Segment = shuffle2.writers(0).fileSegment()
-      shuffle2.releaseWriters(success = true)
-
-      // Now comes the test :
-      // Write to shuffle 3; and close it, but before registering it, check if 
the file lengths for
-      // previous task (forof shuffle1) is the same as 'segments'. Earlier, we 
were inferring length
-      // of block based on remaining data in file : which could mess things up 
when there is concurrent read
-      // and writes happening to the same shuffle group.
-
-      val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new 
JavaSerializer(testConf),
-        new ShuffleWriteMetrics)
-      for (writer <- shuffle3.writers) {
-        writer.write("test3")
-        writer.write("test4")
-      }
-      for (writer <- shuffle3.writers) {
-        writer.commitAndClose()
-      }
-      // check before we register.
-      checkSegments(shuffle2Segment, 
shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
-      shuffle3.releaseWriters(success = true)
-      checkSegments(shuffle2Segment, 
shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
-      shuffleManager.removeShuffle(1)
-    } finally {
-
-      if (store != null) {
-        store.stop()
-      }
-      actorSystem.shutdown()
-      actorSystem.awaitTermination()
-    }
-  }
-
-  def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, 
length: Int) {
-    val segment = diskBlockManager.getBlockLocation(blockId)
-    assert(segment.file.getName === filename)
-    assert(segment.offset === offset)
-    assert(segment.length === length)
-  }
-
   def writeToFile(file: File, numBytes: Int) {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)

http://git-wip-us.apache.org/repos/asf/spark/blob/acea9280/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3005893..fe8ffe6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -59,6 +59,8 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
             ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.DiskStore.getValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.storage.MemoryStore.Entry")
           ) ++
           Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to