Repository: spark Updated Branches: refs/heads/master b270309d7 -> 78f2af582
SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation All changes from this PR are by mridulm and are drawn from his work in #1609. This patch is intended to fix all major issues related to shuffle file consolidation that mridulm found, while minimizing changes to the code, with the hope that it may be more easily merged into 1.1. This patch is **not** intended as a replacement for #1609, which provides many additional benefits, including fixes to ExternalAppendOnlyMap, improvements to DiskBlockObjectWriter's API, and several new unit tests. If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable option. Author: Aaron Davidson <aa...@databricks.com> Closes #1678 from aarondav/consol and squashes the following commits: 53b3f6d [Aaron Davidson] Correct behavior when writing unopened file 701d045 [Aaron Davidson] Rebase with sort-based shuffle 9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f2af58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f2af58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f2af58 Branch: refs/heads/master Commit: 78f2af582286b81e6dc9fa9d455ed2b369d933bd Parents: b270309 Author: Aaron Davidson <aa...@databricks.com> Authored: Fri Aug 1 13:57:19 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Fri Aug 1 13:57:19 2014 -0700 ---------------------------------------------------------------------- .../spark/shuffle/hash/HashShuffleWriter.scala | 14 ++-- .../spark/shuffle/sort/SortShuffleWriter.scala | 3 +- .../spark/storage/BlockObjectWriter.scala | 53 +++++++----- .../spark/storage/ShuffleBlockManager.scala | 28 ++++--- .../util/collection/ExternalAppendOnlyMap.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 6 +- .../spark/storage/DiskBlockManagerSuite.scala | 87 +++++++++++++++++++- .../apache/spark/tools/StoragePerfTester.scala | 5 +- 8 files changed, 146 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 1923f7c..45d3b8b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V]( } /** Close this writer, passing along whether the map completed */ - override def stop(success: Boolean): Option[MapStatus] = { + override def stop(initiallySuccess: Boolean): Option[MapStatus] = { + var success = initiallySuccess try { if (stopping) { return None @@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V]( stopping = true if (success) { try { - return Some(commitWritesAndBuildStatus()) + Some(commitWritesAndBuildStatus()) } catch { case e: Exception => + success = false revertWrites() throw e } } else { revertWrites() - return None + None } } finally { // Release the writers back to the shuffle block manager. @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V]( var totalBytes = 0L var totalTime = 0L val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter => - writer.commit() - writer.close() + writer.commitAndClose() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) { - writer.revertPartialWrites() - writer.close() + writer.revertPartialWritesAndClose() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 42fcd07..9a356d0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C]( for (elem <- elements) { writer.write(elem) } - writer.commit() - writer.close() + writer.commitAndClose() val segment = writer.fileSegment() offsets(id + 1) = segment.offset + segment.length lengths(id) = segment.length http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index a2687e6..01d46e1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { def isOpen: Boolean /** - * Flush the partial writes and commit them as a single atomic block. Return the - * number of bytes written for this commit. + * Flush the partial writes and commit them as a single atomic block. */ - def commit(): Long + def commitAndClose(): Unit /** * Reverts writes that haven't been flushed yet. Callers should invoke this function - * when there are runtime exceptions. + * when there are runtime exceptions. This method will not throw, though it may be + * unsuccessful in truncating written data. */ - def revertPartialWrites() + def revertPartialWritesAndClose() /** * Writes an object. @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { /** * Returns the file segment of committed data that this Writer has written. + * This is only valid after commitAndClose() has been called. */ def fileSegment(): FileSegment @@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter( private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private val initialPosition = file.length() - private var lastValidPosition = initialPosition + private var finalPosition: Long = -1 private var initialized = false private var _timeWriting = 0L @@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter( fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() - lastValidPosition = initialPosition bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos - } else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition + close() } + finalPosition = file.length() } - override def revertPartialWrites() { - if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) + // Discard current writes. We do this by flushing the outstanding writes and then + // truncating the file to its initial position. + override def revertPartialWritesAndClose() { + try { + if (initialized) { + objOut.flush() + bs.flush() + close() + } + + val truncateStream = new FileOutputStream(file, true) + try { + truncateStream.getChannel.truncate(initialPosition) + } finally { + truncateStream.close() + } + } catch { + case e: Exception => + logError("Uncaught exception while reverting partial writes to file " + file, e) } } @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter( // Only valid if called after commit() override def bytesWritten: Long = { - lastValidPosition - initialPosition + assert(finalPosition != -1, "bytesWritten is only valid after successful commit()") + finalPosition - initialPosition } } http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 7beb55c..28aa35b 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { if (consolidateShuffleFiles) { if (success) { val offsets = writers.map(_.fileSegment().offset) - fileGroup.recordMapOutput(mapId, offsets) + val lengths = writers.map(_.fileSegment().length) + fileGroup.recordMapOutput(mapId, offsets, lengths) } recycleFileGroup(fileGroup) } else { @@ -247,6 +248,8 @@ object ShuffleBlockManager { * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. */ private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + private var numBlocks: Int = 0 + /** * Stores the absolute index of each mapId in the files of this group. For instance, * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. @@ -254,23 +257,27 @@ object ShuffleBlockManager { private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() /** - * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file. - * This ordering allows us to compute block lengths by examining the following block offset. + * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by + * position in the file. * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * reducer. */ private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { new PrimitiveVector[Long]() } - - def numBlocks = mapIdToIndex.size + private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() + } def apply(bucketId: Int) = files(bucketId) - def recordMapOutput(mapId: Int, offsets: Array[Long]) { + def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) { + assert(offsets.length == lengths.length) mapIdToIndex(mapId) = numBlocks + numBlocks += 1 for (i <- 0 until offsets.length) { blockOffsetsByReducer(i) += offsets(i) + blockLengthsByReducer(i) += lengths(i) } } @@ -278,16 +285,11 @@ object ShuffleBlockManager { def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { val file = files(reducerId) val blockOffsets = blockOffsetsByReducer(reducerId) + val blockLengths = blockLengthsByReducer(reducerId) val index = mapIdToIndex.getOrElse(mapId, -1) if (index >= 0) { val offset = blockOffsets(index) - val length = - if (index + 1 < numBlocks) { - blockOffsets(index + 1) - offset - } else { - file.length() - offset - } - assert(length >= 0) + val length = blockLengths(index) Some(new FileSegment(file, offset, length)) } else { None http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index b34512e..cb67a1c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C]( // Flush the disk writer's contents to disk, and update relevant variables def flush() = { - writer.commit() + writer.commitAndClose() val bytesWritten = writer.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 54c3310..6e415a2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C]( // How many elements we have in each partition val elementsPerPartition = new Array[Long](numPartitions) - // Flush the disk writer's contents to disk, and update relevant variables + // Flush the disk writer's contents to disk, and update relevant variables. + // The writer is closed at the end of this process, and cannot be reused. def flush() = { - writer.commit() + writer.commitAndClose() val bytesWritten = writer.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten @@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() - writer.close() writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aaa7714..985ac93 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -22,11 +22,14 @@ import java.io.{File, FileWriter} import scala.collection.mutable import scala.language.reflectiveCalls +import akka.actor.Props import com.google.common.io.Files import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf -import org.apache.spark.util.Utils +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{AkkaUtils, Utils} class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) @@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before newFile.delete() } + private def checkSegments(segment1: FileSegment, segment2: FileSegment) { + assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) + assert (segment1.offset === segment2.offset) + assert (segment1.length === segment2.length) + } + + test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { + + val serializer = new JavaSerializer(testConf) + val confCopy = testConf.clone + // reset after EACH object write. This is to ensure that there are bytes appended after + // an object is written. So if the codepaths assume writeObject is end of data, this should + // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. + confCopy.set("spark.serializer.objectStreamReset", "1") + + val securityManager = new org.apache.spark.SecurityManager(confCopy) + // Do not use the shuffleBlockManager above ! + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy, + securityManager) + val master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), + confCopy) + val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy, + securityManager, null) + + try { + + val shuffleManager = store.shuffleBlockManager + + val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer) + for (writer <- shuffle1.writers) { + writer.write("test1") + writer.write("test2") + } + for (writer <- shuffle1.writers) { + writer.commitAndClose() + } + + val shuffle1Segment = shuffle1.writers(0).fileSegment() + shuffle1.releaseWriters(success = true) + + val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf)) + + for (writer <- shuffle2.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle2.writers) { + writer.commitAndClose() + } + val shuffle2Segment = shuffle2.writers(0).fileSegment() + shuffle2.releaseWriters(success = true) + + // Now comes the test : + // Write to shuffle 3; and close it, but before registering it, check if the file lengths for + // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length + // of block based on remaining data in file : which could mess things up when there is concurrent read + // and writes happening to the same shuffle group. + + val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf)) + for (writer <- shuffle3.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle3.writers) { + writer.commitAndClose() + } + // check before we register. + checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) + shuffle3.releaseWriters(success = true) + checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) + shuffleManager.removeShuffle(1) + } finally { + + if (store != null) { + store.stop() + } + actorSystem.shutdown() + actorSystem.awaitTermination() + } + } + def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { val segment = diskBlockManager.getBlockLocation(blockId) assert(segment.file.getName === filename) http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 8e8c356..8a05fcb 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -61,10 +61,9 @@ object StoragePerfTester { for (i <- 1 to recordsPerMap) { writers(i % numOutputSplits).write(writeData) } - writers.map {w => - w.commit() + writers.map { w => + w.commitAndClose() total.addAndGet(w.fileSegment().length) - w.close() } shuffle.releaseWriters(true)