Repository: spark
Updated Branches:
  refs/heads/master 5bdbedf22 -> ce756daa4


[SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…

## What changes were proposed in this pull request?

Profiling a Spark job spilling large amount of intermediate data we found that 
significant portion of time is being spent in 
DiskObjectWriter.updateBytesWritten function. Looking at the code, we see that 
the function is being called too frequently to update the number of bytes 
written to disk. We should reduce the frequency to avoid this.

## How was this patch tested?

Tested by running the job on cluster and saw 20% CPU gain  by this change.

Author: Sital Kedia <ske...@fb.com>

Closes #13332 from sitalkedia/DiskObjectWriter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce756daa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce756daa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce756daa

Branch: refs/heads/master
Commit: ce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c
Parents: 5bdbedf
Author: Sital Kedia <ske...@fb.com>
Authored: Fri May 27 11:22:39 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri May 27 11:22:39 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockObjectWriter.scala    |  3 +--
 .../spark/storage/DiskBlockObjectWriterSuite.scala      | 12 ++++++------
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce756daa/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index ab97d2e..5b493f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter(
     numRecordsWritten += 1
     writeMetrics.incRecordsWritten(1)
 
-    // TODO: call updateBytesWritten() less frequently.
-    if (numRecordsWritten % 32 == 0) {
+    if (numRecordsWritten % 16384 == 0) {
       updateBytesWritten()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce756daa/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 8eff3c2..ec4ef4b 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.bytesWritten == 0)
-    // After 32 writes, metrics should update
-    for (i <- 0 until 32) {
+    // After 16384 writes, metrics should update
+    for (i <- 0 until 16384) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
     assert(writeMetrics.bytesWritten > 0)
-    assert(writeMetrics.recordsWritten === 33)
+    assert(writeMetrics.recordsWritten === 16385)
     writer.commitAndClose()
     assert(file.length() == writeMetrics.bytesWritten)
   }
@@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.bytesWritten == 0)
-    // After 32 writes, metrics should update
-    for (i <- 0 until 32) {
+    // After 16384 writes, metrics should update
+    for (i <- 0 until 16384) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
     assert(writeMetrics.bytesWritten > 0)
-    assert(writeMetrics.recordsWritten === 33)
+    assert(writeMetrics.recordsWritten === 16385)
     writer.revertPartialWritesAndClose()
     assert(writeMetrics.bytesWritten == 0)
     assert(writeMetrics.recordsWritten == 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to