Repository: spark
Updated Branches:
  refs/heads/master a3c7b4187 -> bf665a958


[SPARK-15958] Make initial buffer size for the Sorter configurable

## What changes were proposed in this pull request?

Currently the initial buffer size in the sorter is hard coded inside the code 
and is too small for large workload. As a result, the sorter spends significant 
time expanding the buffer size and copying the data. It would be useful to have 
it configurable.

## How was this patch tested?

Tested by running a job on the cluster.

Author: Sital Kedia <ske...@fb.com>

Closes #13699 from sitalkedia/config_sort_buffer_upstream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf665a95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf665a95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf665a95

Branch: refs/heads/master
Commit: bf665a958631125a1670504ef5966ef1a0e14798
Parents: a3c7b41
Author: Sital Kedia <ske...@fb.com>
Authored: Sat Jun 25 09:13:39 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Jun 25 09:13:39 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java    | 7 +++++--
 .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java   | 4 ++--
 .../apache/spark/sql/execution/UnsafeExternalRowSorter.java   | 4 +++-
 .../apache/spark/sql/execution/UnsafeKVExternalSorter.java    | 7 +++++--
 4 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index daa63d4..05fa04c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -61,7 +61,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   private static final ClassTag<Object> OBJECT_CLASS_TAG = 
ClassTag$.MODULE$.Object();
 
   @VisibleForTesting
-  static final int INITIAL_SORT_BUFFER_SIZE = 4096;
+  static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
 
   private final BlockManager blockManager;
   private final IndexShuffleBlockResolver shuffleBlockResolver;
@@ -74,6 +74,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   private final TaskContext taskContext;
   private final SparkConf sparkConf;
   private final boolean transferToEnabled;
+  private final int initialSortBufferSize;
 
   @Nullable private MapStatus mapStatus;
   @Nullable private ShuffleExternalSorter sorter;
@@ -122,6 +123,8 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     this.taskContext = taskContext;
     this.sparkConf = sparkConf;
     this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", 
true);
+    this.initialSortBufferSize = 
sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
+                                                  
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
     open();
   }
 
@@ -187,7 +190,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       memoryManager,
       blockManager,
       taskContext,
-      INITIAL_SORT_BUFFER_SIZE,
+      initialSortBufferSize,
       partitioner.numPartitions(),
       sparkConf,
       writeMetrics);

http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 7dd61f8..daeb467 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -413,10 +413,10 @@ public class UnsafeShuffleWriterSuite {
   }
 
   private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws 
Exception {
-    memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
+    memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 
16);
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
-    for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) 
{
+    for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 
1; i++) {
       dataToWrite.add(new Tuple2<Object, Object>(i, i));
     }
     writer.write(dataToWrite.iterator());

http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index ad76bf5..0b177ad 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -38,6 +38,7 @@ import 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator;
 
 public final class UnsafeExternalRowSorter {
 
+  static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
   /**
    * If positive, forces records to be spilled to disk at the given frequency 
(measured in numbers
    * of records). This is only intended to be used in tests.
@@ -85,7 +86,8 @@ public final class UnsafeExternalRowSorter {
       taskContext,
       new RowComparator(ordering, schema.length()),
       prefixComparator,
-      /* initialSize */ 4096,
+      sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
+                             DEFAULT_INITIAL_SORT_BUFFER_SIZE),
       pageSizeBytes,
       canUseRadixSort
     );

http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 99fe51d..b1cc523 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializerManager;
@@ -86,7 +87,8 @@ public final class UnsafeKVExternalSorter {
         taskContext,
         recordComparator,
         prefixComparator,
-        /* initialSize */ 4096,
+        SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
+                                     
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
         pageSizeBytes,
         canUseRadixSort);
     } else {
@@ -131,7 +133,8 @@ public final class UnsafeKVExternalSorter {
         taskContext,
         new KVComparator(ordering, keySchema.length()),
         prefixComparator,
-        /* initialSize */ 4096,
+        SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
+                                     
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
         pageSizeBytes,
         inMemSorter);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to