Repository: spark
Updated Branches:
  refs/heads/master d935e0a9d -> 473d7552a


[SPARK-20014] Optimize mergeSpillsWithFileStream method

## What changes were proposed in this pull request?

When the individual partition size in a spill is small, 
mergeSpillsWithTransferTo method does many small disk ios which is really 
inefficient. One way to improve the performance will be to use 
mergeSpillsWithFileStream method by turning off transfer to and using buffered 
file read/write to improve the io throughput.
However, the current implementation of mergeSpillsWithFileStream does not do a 
buffer read/write of the files and in addition to that it unnecessarily flushes 
the output files for each partitions.

## How was this patch tested?

Tested this change by running a job on the cluster and the map stage run time 
was reduced by around 20%.

Author: Sital Kedia <ske...@fb.com>

Closes #17343 from sitalkedia/upstream_mergeSpillsWithFileStream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/473d7552
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/473d7552
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/473d7552

Branch: refs/heads/master
Commit: 473d7552acb19f440a0cb082e6d3cba67579bd5a
Parents: d935e0a
Author: Sital Kedia <ske...@fb.com>
Authored: Fri May 26 13:41:13 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri May 26 13:41:13 2017 -0700

----------------------------------------------------------------------
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 44 +++++++++++++++-----
 1 file changed, 33 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/473d7552/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 2fde5c3..857ec8a 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -40,6 +40,7 @@ import org.apache.spark.annotation.Private;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.io.CompressionCodec;
 import org.apache.spark.io.CompressionCodec$;
+import org.apache.spark.io.NioBufferedFileInputStream;
 import org.apache.commons.io.output.CloseShieldOutputStream;
 import org.apache.commons.io.output.CountingOutputStream;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -98,6 +99,18 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
    */
   private boolean stopping = false;
 
+  private class CloseAndFlushShieldOutputStream extends 
CloseShieldOutputStream {
+
+    CloseAndFlushShieldOutputStream(OutputStream outputStream) {
+      super(outputStream);
+    }
+
+    @Override
+    public void flush() {
+      // do nothing
+    }
+  }
+
   public UnsafeShuffleWriter(
       BlockManager blockManager,
       IndexShuffleBlockResolver shuffleBlockResolver,
@@ -321,11 +334,15 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   }
 
   /**
-   * Merges spill files using Java FileStreams. This code path is slower than 
the NIO-based merge,
-   * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, 
so it's only used in
-   * cases where the IO compression codec does not support concatenation of 
compressed data, when
-   * encryption is enabled, or when users have explicitly disabled use of 
{@code transferTo} in
-   * order to work around kernel bugs.
+   * Merges spill files using Java FileStreams. This code path is typically 
slower than
+   * the NIO-based merge, {@link 
UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[],
+   * File)}, and it's mostly used in cases where the IO compression codec does 
not support
+   * concatenation of compressed data, when encryption is enabled, or when 
users have
+   * explicitly disabled use of {@code transferTo} in order to work around 
kernel bugs.
+   * This code path might also be faster in cases where individual partition 
size in a spill
+   * is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method 
performs many small
+   * disk ios which is inefficient. In those case, Using large buffers for 
input and output
+   * files helps reducing the number of disk ios, making the file merging 
faster.
    *
    * @param spills the spills to merge.
    * @param outputFile the file to write the merged data to.
@@ -339,23 +356,28 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     assert (spills.length >= 2);
     final int numPartitions = partitioner.numPartitions();
     final long[] partitionLengths = new long[numPartitions];
-    final InputStream[] spillInputStreams = new FileInputStream[spills.length];
+    final InputStream[] spillInputStreams = new InputStream[spills.length];
 
+    final OutputStream bos = new BufferedOutputStream(
+            new FileOutputStream(outputFile),
+            (int) 
sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024);
     // Use a counting output stream to avoid having to close the underlying 
file and ask
     // the file system for its size after each partition is written.
-    final CountingOutputStream mergedFileOutputStream = new 
CountingOutputStream(
-      new FileOutputStream(outputFile));
+    final CountingOutputStream mergedFileOutputStream = new 
CountingOutputStream(bos);
+    final int inputBufferSizeInBytes = (int) 
sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 
     boolean threwException = true;
     try {
       for (int i = 0; i < spills.length; i++) {
-        spillInputStreams[i] = new FileInputStream(spills[i].file);
+        spillInputStreams[i] = new NioBufferedFileInputStream(
+            spills[i].file,
+            inputBufferSizeInBytes);
       }
       for (int partition = 0; partition < numPartitions; partition++) {
         final long initialFileLength = mergedFileOutputStream.getByteCount();
-        // Shield the underlying output stream from close() calls, so that we 
can close the higher
+        // Shield the underlying output stream from close() and flush() calls, 
so that we can close the higher
         // level streams to make sure all data is really flushed and internal 
state is cleaned.
-        OutputStream partitionOutput = new CloseShieldOutputStream(
+        OutputStream partitionOutput = new CloseAndFlushShieldOutputStream(
           new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));
         partitionOutput = 
blockManager.serializerManager().wrapForEncryption(partitionOutput);
         if (compressionCodec != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to