Repository: spark Updated Branches: refs/heads/master a3c7b4187 -> bf665a958
[SPARK-15958] Make initial buffer size for the Sorter configurable ## What changes were proposed in this pull request? Currently the initial buffer size in the sorter is hard coded inside the code and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable. ## How was this patch tested? Tested by running a job on the cluster. Author: Sital Kedia <ske...@fb.com> Closes #13699 from sitalkedia/config_sort_buffer_upstream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf665a95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf665a95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf665a95 Branch: refs/heads/master Commit: bf665a958631125a1670504ef5966ef1a0e14798 Parents: a3c7b41 Author: Sital Kedia <ske...@fb.com> Authored: Sat Jun 25 09:13:39 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Sat Jun 25 09:13:39 2016 +0100 ---------------------------------------------------------------------- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 7 +++++-- .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 4 ++-- .../apache/spark/sql/execution/UnsafeExternalRowSorter.java | 4 +++- .../apache/spark/sql/execution/UnsafeKVExternalSorter.java | 7 +++++-- 4 files changed, 15 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index daa63d4..05fa04c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -61,7 +61,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); @VisibleForTesting - static final int INITIAL_SORT_BUFFER_SIZE = 4096; + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; private final BlockManager blockManager; private final IndexShuffleBlockResolver shuffleBlockResolver; @@ -74,6 +74,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { private final TaskContext taskContext; private final SparkConf sparkConf; private final boolean transferToEnabled; + private final int initialSortBufferSize; @Nullable private MapStatus mapStatus; @Nullable private ShuffleExternalSorter sorter; @@ -122,6 +123,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); + this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", + DEFAULT_INITIAL_SORT_BUFFER_SIZE); open(); } @@ -187,7 +190,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { memoryManager, blockManager, taskContext, - INITIAL_SORT_BUFFER_SIZE, + initialSortBufferSize, partitioner.numPartitions(), sparkConf, writeMetrics); http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 7dd61f8..daeb467 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -413,10 +413,10 @@ public class UnsafeShuffleWriterSuite { } private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { - memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16); + memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); - for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) { + for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2<Object, Object>(i, i)); } writer.write(dataToWrite.iterator()); http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index ad76bf5..0b177ad 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -38,6 +38,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator; public final class UnsafeExternalRowSorter { + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; /** * If positive, forces records to be spilled to disk at the given frequency (measured in numbers * of records). This is only intended to be used in tests. @@ -85,7 +86,8 @@ public final class UnsafeExternalRowSorter { taskContext, new RowComparator(ordering, schema.length()), prefixComparator, - /* initialSize */ 4096, + sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", + DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, canUseRadixSort ); http://git-wip-us.apache.org/repos/asf/spark/blob/bf665a95/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 99fe51d..b1cc523 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializerManager; @@ -86,7 +87,8 @@ public final class UnsafeKVExternalSorter { taskContext, recordComparator, prefixComparator, - /* initialSize */ 4096, + SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", + UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, canUseRadixSort); } else { @@ -131,7 +133,8 @@ public final class UnsafeKVExternalSorter { taskContext, new KVComparator(ordering, keySchema.length()), prefixComparator, - /* initialSize */ 4096, + SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", + UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, inMemSorter); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org