Repository: spark
Updated Branches:
  refs/heads/master b3862d3c5 -> 8023242e7


[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId

The DiskBlockObjectWriter constructor took a BlockId parameter but never used 
it. As part of some general cleanup in these interfaces, this patch refactors 
its constructor to eliminate this parameter.

Author: Josh Rosen <joshro...@databricks.com>

Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8023242e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8023242e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8023242e

Branch: refs/heads/master
Commit: 8023242e77e4a799de6edc490078285684549b6d
Parents: b3862d3
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Sep 24 14:18:33 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Sep 24 14:18:33 2015 -0700

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      |  9 +++---
 .../shuffle/IndexShuffleBlockResolver.scala     |  3 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/storage/DiskBlockObjectWriter.scala   |  7 +++--
 .../unsafe/UnsafeShuffleWriterSuite.java        |  1 -
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  1 -
 .../BypassMergeSortShuffleWriterSuite.scala     |  1 -
 .../storage/DiskBlockObjectWriterSuite.scala    | 33 ++++++++++----------
 8 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0b8b604..f5d80bb 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements 
SortShuffleFileWriter<
         } finally {
           Closeables.close(in, copyThrewException);
         }
-        if 
(!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete())
 {
+        if (!partitionWriters[i].fileSegment().file().delete()) {
           logger.error("Unable to delete file for partition {}", i);
         }
       }
@@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements 
SortShuffleFileWriter<
   public void stop() throws IOException {
     if (partitionWriters != null) {
       try {
-        final DiskBlockManager diskBlockManager = 
blockManager.diskBlockManager();
         for (DiskBlockObjectWriter writer : partitionWriters) {
           // This method explicitly does _not_ throw exceptions:
-          writer.revertPartialWritesAndClose();
-          if (!diskBlockManager.getFile(writer.blockId()).delete()) {
-            logger.error("Error while deleting file for block {}", 
writer.blockId());
+          File file = writer.revertPartialWritesAndClose();
+          if (!file.delete()) {
+            logger.error("Error while deleting file {}", 
file.getAbsolutePath());
           }
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 65887d1..5e4c2b5 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: 
SparkConf) extends ShuffleB
 }
 
 private[spark] object IndexShuffleBlockResolver {
-  // No-op reduce ID used in interactions with disk store and 
DiskBlockObjectWriter.
+  // No-op reduce ID used in interactions with disk store.
   // The disk store currently expects puts to relate to a (map, reduce) pair, 
but in the sort
   // shuffle outputs for several reduces are glommed into a single file.
-  // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require 
a BlockId.
   val NOOP_REDUCE_ID = 0
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index bca3942..47bd2ef 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,7 +669,7 @@ private[spark] class BlockManager(
       writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = 
wrapForCompression(blockId, _)
     val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
-    new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, 
compressStream,
+    new DiskBlockObjectWriter(file, serializerInstance, bufferSize, 
compressStream,
       syncWrites, writeMetrics)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 49d9154..80d426f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
  * reopened again.
  */
 private[spark] class DiskBlockObjectWriter(
-    val blockId: BlockId,
     file: File,
     serializerInstance: SerializerInstance,
     bufferSize: Int,
@@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter(
    * Reverts writes that haven't been flushed yet. Callers should invoke this 
function
    * when there are runtime exceptions. This method will not throw, though it 
may be
    * unsuccessful in truncating written data.
+   *
+   * @return the file that this DiskBlockObjectWriter wrote to.
    */
-  def revertPartialWritesAndClose() {
+  def revertPartialWritesAndClose(): File = {
     // Discard current writes. We do this by flushing the outstanding writes 
and then
     // truncating the file to its initial position.
     try {
@@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter(
       val truncateStream = new FileOutputStream(file, true)
       try {
         truncateStream.getChannel.truncate(initialPosition)
+        file
       } finally {
         truncateStream.close()
       }
     } catch {
       case e: Exception =>
         logError("Uncaught exception while reverting partial writes to file " 
+ file, e)
+        file
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index a266b0c..d218344 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
-          (BlockId) args[0],
           (File) args[1],
           (SerializerInstance) args[2],
           (Integer) args[3],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 445a37b..a5bbaa9 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
-          (BlockId) args[0],
           (File) args[1],
           (SerializerInstance) args[2],
           (Integer) args[3],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index cc7342f..341f56d 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite 
with BeforeAndAfte
       override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter 
= {
         val args = invocation.getArguments
         new DiskBlockObjectWriter(
-          args(0).asInstanceOf[BlockId],
           args(1).asInstanceOf[File],
           args(2).asInstanceOf[SerializerInstance],
           args(3).asInstanceOf[Int],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 66af6e1..7c19531 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -20,7 +20,6 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.SparkConf
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.serializer.JavaSerializer
@@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("verify write metrics") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
@@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("verify write metrics on revert") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
@@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("Reopening a closed block writer") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
 
     writer.open()
     writer.close()
@@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("calling revertPartialWritesAndClose() on a closed block writer should 
have no effect") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("commitAndClose() should be idempotent") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("revertPartialWritesAndClose() should be idempotent") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("fileSegment() can only be called after commitAndClose() has been 
called") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("commitAndClose() without ever opening or writing") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, 
writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, 
true, writeMetrics)
     writer.commitAndClose()
     assert(writer.fileSegment().length === 0)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to