Repository: spark
Updated Branches:
  refs/heads/master b270309d7 -> 78f2af582


SPARK-2791: Fix committing, reverting and state tracking in shuffle file 
consolidation

All changes from this PR are by mridulm and are drawn from his work in #1609. 
This patch is intended to fix all major issues related to shuffle file 
consolidation that mridulm found, while minimizing changes to the code, with 
the hope that it may be more easily merged into 1.1.

This patch is **not** intended as a replacement for #1609, which provides many 
additional benefits, including fixes to ExternalAppendOnlyMap, improvements to 
DiskBlockObjectWriter's API, and several new unit tests.

If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable 
option.

Author: Aaron Davidson <aa...@databricks.com>

Closes #1678 from aarondav/consol and squashes the following commits:

53b3f6d [Aaron Davidson] Correct behavior when writing unopened file
701d045 [Aaron Davidson] Rebase with sort-based shuffle
9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f2af58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f2af58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f2af58

Branch: refs/heads/master
Commit: 78f2af582286b81e6dc9fa9d455ed2b369d933bd
Parents: b270309
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Aug 1 13:57:19 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Aug 1 13:57:19 2014 -0700

----------------------------------------------------------------------
 .../spark/shuffle/hash/HashShuffleWriter.scala  | 14 ++--
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  3 +-
 .../spark/storage/BlockObjectWriter.scala       | 53 +++++++-----
 .../spark/storage/ShuffleBlockManager.scala     | 28 ++++---
 .../util/collection/ExternalAppendOnlyMap.scala |  2 +-
 .../spark/util/collection/ExternalSorter.scala  |  6 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 87 +++++++++++++++++++-
 .../apache/spark/tools/StoragePerfTester.scala  |  5 +-
 8 files changed, 146 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c..45d3b8b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
   }
 
   /** Close this writer, passing along whether the map completed */
-  override def stop(success: Boolean): Option[MapStatus] = {
+  override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+    var success = initiallySuccess
     try {
       if (stopping) {
         return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
       stopping = true
       if (success) {
         try {
-          return Some(commitWritesAndBuildStatus())
+          Some(commitWritesAndBuildStatus())
         } catch {
           case e: Exception =>
+            success = false
             revertWrites()
             throw e
         }
       } else {
         revertWrites()
-        return None
+        None
       }
     } finally {
       // Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
     var totalBytes = 0L
     var totalTime = 0L
     val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
-      writer.commit()
-      writer.close()
+      writer.commitAndClose()
       val size = writer.fileSegment().length
       totalBytes += size
       totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
     if (shuffle != null && shuffle.writers != null) {
       for (writer <- shuffle.writers) {
-        writer.revertPartialWrites()
-        writer.close()
+        writer.revertPartialWritesAndClose()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07..9a356d0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
         for (elem <- elements) {
           writer.write(elem)
         }
-        writer.commit()
-        writer.close()
+        writer.commitAndClose()
         val segment = writer.fileSegment()
         offsets(id + 1) = segment.offset + segment.length
         lengths(id) = segment.length

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index a2687e6..01d46e1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val 
blockId: BlockId) {
   def isOpen: Boolean
 
   /**
-   * Flush the partial writes and commit them as a single atomic block. Return 
the
-   * number of bytes written for this commit.
+   * Flush the partial writes and commit them as a single atomic block.
    */
-  def commit(): Long
+  def commitAndClose(): Unit
 
   /**
    * Reverts writes that haven't been flushed yet. Callers should invoke this 
function
-   * when there are runtime exceptions.
+   * when there are runtime exceptions. This method will not throw, though it 
may be
+   * unsuccessful in truncating written data.
    */
-  def revertPartialWrites()
+  def revertPartialWritesAndClose()
 
   /**
    * Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: 
BlockId) {
 
   /**
    * Returns the file segment of committed data that this Writer has written.
+   * This is only valid after commitAndClose() has been called.
    */
   def fileSegment(): FileSegment
 
@@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter(
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private val initialPosition = file.length()
-  private var lastValidPosition = initialPosition
+  private var finalPosition: Long = -1
   private var initialized = false
   private var _timeWriting = 0L
 
@@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter(
     fos = new FileOutputStream(file, true)
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
-    lastValidPosition = initialPosition
     bs = compressStream(new BufferedOutputStream(ts, bufferSize))
     objOut = serializer.newInstance().serializeStream(bs)
     initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
     if (initialized) {
       // NOTE: Because Kryo doesn't flush the underlying stream we explicitly 
flush both the
       //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
-      val prevPos = lastValidPosition
-      lastValidPosition = channel.position()
-      lastValidPosition - prevPos
-    } else {
-      // lastValidPosition is zero if stream is uninitialized
-      lastValidPosition
+      close()
     }
+    finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-    if (initialized) {
-      // Discard current writes. We do this by flushing the outstanding writes 
and
-      // truncate the file to the last valid position.
-      objOut.flush()
-      bs.flush()
-      channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes and 
then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+    try {
+      if (initialized) {
+        objOut.flush()
+        bs.flush()
+        close()
+      }
+
+      val truncateStream = new FileOutputStream(file, true)
+      try {
+        truncateStream.getChannel.truncate(initialPosition)
+      } finally {
+        truncateStream.close()
+      }
+    } catch {
+      case e: Exception =>
+        logError("Uncaught exception while reverting partial writes to file " 
+ file, e)
     }
   }
 
@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
 
   // Only valid if called after commit()
   override def bytesWritten: Long = {
-    lastValidPosition - initialPosition
+    assert(finalPosition != -1, "bytesWritten is only valid after successful 
commit()")
+    finalPosition - initialPosition
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 7beb55c..28aa35b 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
         if (consolidateShuffleFiles) {
           if (success) {
             val offsets = writers.map(_.fileSegment().offset)
-            fileGroup.recordMapOutput(mapId, offsets)
+            val lengths = writers.map(_.fileSegment().length)
+            fileGroup.recordMapOutput(mapId, offsets, lengths)
           }
           recycleFileGroup(fileGroup)
         } else {
@@ -247,6 +248,8 @@ object ShuffleBlockManager {
    * A particular mapper will be assigned a single ShuffleFileGroup to write 
its output to.
    */
   private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val 
files: Array[File]) {
+    private var numBlocks: Int = 0
+
     /**
      * Stores the absolute index of each mapId in the files of this group. For 
instance,
      * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
@@ -254,23 +257,27 @@ object ShuffleBlockManager {
     private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
 
     /**
-     * Stores consecutive offsets of blocks into each reducer file, ordered by 
position in the file.
-     * This ordering allows us to compute block lengths by examining the 
following block offset.
+     * Stores consecutive offsets and lengths of blocks into each reducer 
file, ordered by
+     * position in the file.
      * Note: mapIdToIndex(mapId) returns the index of the mapper into the 
vector for every
      * reducer.
      */
     private val blockOffsetsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
       new PrimitiveVector[Long]()
     }
-
-    def numBlocks = mapIdToIndex.size
+    private val blockLengthsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
 
     def apply(bucketId: Int) = files(bucketId)
 
-    def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: 
Array[Long]) {
+      assert(offsets.length == lengths.length)
       mapIdToIndex(mapId) = numBlocks
+      numBlocks += 1
       for (i <- 0 until offsets.length) {
         blockOffsetsByReducer(i) += offsets(i)
+        blockLengthsByReducer(i) += lengths(i)
       }
     }
 
@@ -278,16 +285,11 @@ object ShuffleBlockManager {
     def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
       val file = files(reducerId)
       val blockOffsets = blockOffsetsByReducer(reducerId)
+      val blockLengths = blockLengthsByReducer(reducerId)
       val index = mapIdToIndex.getOrElse(mapId, -1)
       if (index >= 0) {
         val offset = blockOffsets(index)
-        val length =
-          if (index + 1 < numBlocks) {
-            blockOffsets(index + 1) - offset
-          } else {
-            file.length() - offset
-          }
-        assert(length >= 0)
+        val length = blockLengths(index)
         Some(new FileSegment(file, offset, length))
       } else {
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b34512e..cb67a1c 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Flush the disk writer's contents to disk, and update relevant variables
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 54c3310..6e415a2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
     // How many elements we have in each partition
     val elementsPerPartition = new Array[Long](numPartitions)
 
-    // Flush the disk writer's contents to disk, and update relevant variables
+    // Flush the disk writer's contents to disk, and update relevant variables.
+    // The writer is closed at the end of this process, and cannot be reused.
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
@@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           flush()
-          writer.close()
           writer = blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSize)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aaa7714..985ac93 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
 import scala.collection.mutable
 import scala.language.reflectiveCalls
 
+import akka.actor.Props
 import com.google.common.io.Files
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with 
BeforeAndAfterAll {
   private val testConf = new SparkConf(false)
@@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
     newFile.delete()
   }
 
+  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+    assert (segment1.offset === segment2.offset)
+    assert (segment1.length === segment2.length)
+  }
+
+  test("consolidated shuffle can write to shuffle group without messing 
existing offsets/lengths") {
+
+    val serializer = new JavaSerializer(testConf)
+    val confCopy = testConf.clone
+    // reset after EACH object write. This is to ensure that there are bytes 
appended after
+    // an object is written. So if the codepaths assume writeObject is end of 
data, this should
+    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+    confCopy.set("spark.serializer.objectStreamReset", "1")
+
+    val securityManager = new org.apache.spark.SecurityManager(confCopy)
+    // Do not use the shuffleBlockManager above !
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, confCopy,
+      securityManager)
+    val master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, 
new LiveListenerBus))),
+      confCopy)
+    val store = new BlockManager("<driver>", actorSystem, master , serializer, 
confCopy,
+      securityManager, null)
+
+    try {
+
+      val shuffleManager = store.shuffleBlockManager
+
+      val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
+      for (writer <- shuffle1.writers) {
+        writer.write("test1")
+        writer.write("test2")
+      }
+      for (writer <- shuffle1.writers) {
+        writer.commitAndClose()
+      }
+
+      val shuffle1Segment = shuffle1.writers(0).fileSegment()
+      shuffle1.releaseWriters(success = true)
+
+      val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new 
JavaSerializer(testConf))
+
+      for (writer <- shuffle2.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle2.writers) {
+        writer.commitAndClose()
+      }
+      val shuffle2Segment = shuffle2.writers(0).fileSegment()
+      shuffle2.releaseWriters(success = true)
+
+      // Now comes the test :
+      // Write to shuffle 3; and close it, but before registering it, check if 
the file lengths for
+      // previous task (forof shuffle1) is the same as 'segments'. Earlier, we 
were inferring length
+      // of block based on remaining data in file : which could mess things up 
when there is concurrent read
+      // and writes happening to the same shuffle group.
+
+      val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new 
JavaSerializer(testConf))
+      for (writer <- shuffle3.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle3.writers) {
+        writer.commitAndClose()
+      }
+      // check before we register.
+      checkSegments(shuffle2Segment, 
shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+      shuffle3.releaseWriters(success = true)
+      checkSegments(shuffle2Segment, 
shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+      shuffleManager.removeShuffle(1)
+    } finally {
+
+      if (store != null) {
+        store.stop()
+      }
+      actorSystem.shutdown()
+      actorSystem.awaitTermination()
+    }
+  }
+
   def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, 
length: Int) {
     val segment = diskBlockManager.getBlockLocation(blockId)
     assert(segment.file.getName === filename)

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala 
b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c356..8a05fcb 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
       for (i <- 1 to recordsPerMap) {
         writers(i % numOutputSplits).write(writeData)
       }
-      writers.map {w =>
-        w.commit()
+      writers.map { w =>
+        w.commitAndClose()
         total.addAndGet(w.fileSegment().length)
-        w.close()
       }
 
       shuffle.releaseWriters(true)

Reply via email to