Repository: spark
Updated Branches:
  refs/heads/branch-1.3 de8b2d4be -> e4db5a327


[SPARK-3570] Include time to open files in shuffle write time.

Opening shuffle files can be very significant when the disk is
contended, especially when using ext3. While writing data to
a file can avoid hitting disk (and instead hit the buffer
cache), opening a file always involves writing some metadata
about the file to disk, so the open time can be a very significant
portion of the shuffle write time. In one job I ran recently, the time to
write shuffle data to the file was only 4ms for each task, but
the time to open the file was about 100x as long (~400ms).

When we add metrics about spilled data (#2504), we should ensure
that the file open time is also included there.

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #4550 from kayousterhout/SPARK-3570 and squashes the following commits:

ea3a4ae [Kay Ousterhout] Added comment about excluded open time
fdc5185 [Kay Ousterhout] Improved comment
42b7e43 [Kay Ousterhout] Fixed parens for nanotime
2423555 [Kay Ousterhout] [SPARK-3570] Include time to open files in shuffle 
write time.

(cherry picked from commit d8ccf655f344eed65cdaf5d9252f1b565b8406ca)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4db5a32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4db5a32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4db5a32

Branch: refs/heads/branch-1.3
Commit: e4db5a327b2c5f65c442e2ff7ec333fbee8e27e7
Parents: de8b2d4
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Tue Mar 24 16:29:40 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Mar 24 16:29:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/shuffle/FileShuffleBlockManager.scala      | 4 ++++
 .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 3 +++
 .../scala/org/apache/spark/util/collection/ExternalSorter.scala | 5 +++++
 3 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4db5a32/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 7de2f9c..7aec9f9 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
       private val shuffleState = shuffleStates(shuffleId)
       private var fileGroup: ShuffleFileGroup = null
 
+      val openStartTime = System.nanoTime
       val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
         fileGroup = getUnusedFileGroup()
         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
           blockManager.getDiskWriter(blockId, blockFile, serializer, 
bufferSize, writeMetrics)
         }
       }
+      // Creating the file to write to and creating a disk writer both involve 
interacting with
+      // the disk, so should be included in the shuffle write time.
+      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
 
       override def releaseWriters(success: Boolean) {
         if (consolidateShuffleFiles) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e4db5a32/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 27496c5..621b264 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
       sorter.insertAll(records)
     }
 
+    // Don't bother including the time to open the merged output file in the 
shuffle write time,
+    // because it just opens a single file, so is typically too fast to 
measure accurately
+    // (see SPARK-3570).
     val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
     val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
     val partitionLengths = sorter.writePartitionedFile(blockId, context, 
outputFile)

http://git-wip-us.apache.org/repos/asf/spark/blob/e4db5a32/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index d69f2d9..244d4af 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -352,6 +352,7 @@ private[spark] class ExternalSorter[K, V, C](
     // Create our file writers if we haven't done so yet
     if (partitionWriters == null) {
       curWriteMetrics = new ShuffleWriteMetrics()
+      val openStartTime = System.nanoTime
       partitionWriters = Array.fill(numPartitions) {
         // Because these files may be read during shuffle, their compression 
must be controlled by
         // spark.shuffle.compress instead of spark.shuffle.spill.compress, so 
we need to use
@@ -359,6 +360,10 @@ private[spark] class ExternalSorter[K, V, C](
         val (blockId, file) = diskBlockManager.createTempShuffleBlock()
         blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, 
curWriteMetrics).open()
       }
+      // Creating the file to write to and creating a disk writer both involve 
interacting with
+      // the disk, and can take a long time in aggregate when we open many 
files, so should be
+      // included in the shuffle write time.
+      curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
     }
 
     // No need to sort stuff, just write each element out


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to