Repository: spark Updated Branches: refs/heads/master b3862d3c5 -> 8023242e7
[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter. Author: Josh Rosen <joshro...@databricks.com> Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8023242e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8023242e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8023242e Branch: refs/heads/master Commit: 8023242e77e4a799de6edc490078285684549b6d Parents: b3862d3 Author: Josh Rosen <joshro...@databricks.com> Authored: Thu Sep 24 14:18:33 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Sep 24 14:18:33 2015 -0700 ---------------------------------------------------------------------- .../sort/BypassMergeSortShuffleWriter.java | 9 +++--- .../shuffle/IndexShuffleBlockResolver.scala | 3 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/DiskBlockObjectWriter.scala | 7 +++-- .../unsafe/UnsafeShuffleWriterSuite.java | 1 - .../unsafe/sort/UnsafeExternalSorterSuite.java | 1 - .../BypassMergeSortShuffleWriterSuite.scala | 1 - .../storage/DiskBlockObjectWriterSuite.scala | 33 ++++++++++---------- 8 files changed, 27 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 0b8b604..f5d80bb 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter< } finally { Closeables.close(in, copyThrewException); } - if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) { + if (!partitionWriters[i].fileSegment().file().delete()) { logger.error("Unable to delete file for partition {}", i); } } @@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter< public void stop() throws IOException { if (partitionWriters != null) { try { - final DiskBlockManager diskBlockManager = blockManager.diskBlockManager(); for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { - logger.error("Error while deleting file for block {}", writer.blockId()); + File file = writer.revertPartialWritesAndClose(); + if (!file.delete()) { + logger.error("Error while deleting file {}", file.getAbsolutePath()); } } } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 65887d1..5e4c2b5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB } private[spark] object IndexShuffleBlockResolver { - // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter. + // No-op reduce ID used in interactions with disk store. // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort // shuffle outputs for several reduces are glommed into a single file. - // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId. val NOOP_REDUCE_ID = 0 } http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index bca3942..47bd2ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -669,7 +669,7 @@ private[spark] class BlockManager( writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) - new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream, + new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream, syncWrites, writeMetrics) } http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 49d9154..80d426f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -34,7 +34,6 @@ import org.apache.spark.util.Utils * reopened again. */ private[spark] class DiskBlockObjectWriter( - val blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, @@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter( * Reverts writes that haven't been flushed yet. Callers should invoke this function * when there are runtime exceptions. This method will not throw, though it may be * unsuccessful in truncating written data. + * + * @return the file that this DiskBlockObjectWriter wrote to. */ - def revertPartialWritesAndClose() { + def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. try { @@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter( val truncateStream = new FileOutputStream(file, true) try { truncateStream.getChannel.truncate(initialPosition) + file } finally { truncateStream.close() } } catch { case e: Exception => logError("Uncaught exception while reverting partial writes to file " + file, e) + file } } http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index a266b0c..d218344 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 445a37b..a5bbaa9 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index cc7342f..341f56d 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = { val args = invocation.getArguments new DiskBlockObjectWriter( - args(0).asInstanceOf[BlockId], args(1).asInstanceOf[File], args(2).asInstanceOf[SerializerInstance], args(3).asInstanceOf[Int], http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 66af6e1..7c19531 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -20,7 +20,6 @@ import java.io.File import org.scalatest.BeforeAndAfterEach -import org.apache.spark.SparkConf import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.JavaSerializer @@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics on revert") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("Reopening a closed block writer") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.open() writer.close() @@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("revertPartialWritesAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("fileSegment() can only be called after commitAndClose() has been called") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() without ever opening or writing") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.commitAndClose() assert(writer.fileSegment().length === 0) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org