Repository: spark
Updated Branches:
  refs/heads/master 5344bade8 -> 07f46afc7


[SPARK-13850] Force the sorter to Spill when number of elements in th…

## What changes were proposed in this pull request?

Force the sorter to Spill when number of elements in the pointer array reach a 
certain size. This is to workaround the issue of timSort failing on large 
buffer size.

## How was this patch tested?

Tested by running a job which was failing without this change due to TimSort 
bug.

Author: Sital Kedia <ske...@fb.com>

Closes #13107 from sitalkedia/fix_TimSort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f46afc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f46afc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f46afc

Branch: refs/heads/master
Commit: 07f46afc733b1718d528a6ea5c0d774f047024fa
Parents: 5344bad
Author: Sital Kedia <ske...@fb.com>
Authored: Thu Jun 30 10:53:18 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Thu Jun 30 10:53:18 2016 -0700

----------------------------------------------------------------------
 .../shuffle/sort/ShuffleExternalSorter.java     | 10 ++++++---
 .../unsafe/sort/UnsafeExternalSorter.java       | 23 +++++++++++++++++---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  3 +++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 ++
 .../UnsafeFixedWidthAggregationMap.java         |  3 +++
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +++++--
 .../apache/spark/sql/execution/WindowExec.scala |  2 ++
 .../execution/datasources/WriterContainer.scala |  5 ++++-
 .../execution/joins/CartesianProductExec.scala  |  2 ++
 .../execution/streaming/FileStreamSink.scala    |  5 ++++-
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 +++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  5 ++++-
 12 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef8..696ee73 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final TaskContext taskContext;
   private final ShuffleWriteMetrics writeMetrics;
 
-  /** Force this sorter to spill when there are this many elements in memory. 
For testing only */
+  /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   */
   private final long numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.numElementsForSpillThreshold =
-      conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 
Long.MAX_VALUE);
+      conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
     this.writeMetrics = writeMetrics;
     this.inMemSorter = new ShuffleInMemorySorter(
       this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
     // for tests
     assert(inMemSorter != null);
-    if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+      logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
       spill();
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ec15f0b..d6a255e 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
@@ -60,6 +61,13 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   private final int fileBufferSizeBytes;
 
   /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array 
to be 8G.
+   */
+  public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 
1024 * 1024 / 2;
+
+  private final long numElementsForSpillThreshold;
+  /**
    * Memory pages that hold the records being sorted. The pages in this list 
are freed when
    * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
    * this might not be necessary if we maintained a pool of re-usable pages in 
the TaskMemoryManager
@@ -88,9 +96,10 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
+      long numElementsForSpillThreshold,
       UnsafeInMemorySorter inMemorySorter) throws IOException {
     UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, 
blockManager,
-      serializerManager, taskContext, recordComparator, prefixComparator, 
initialSize,
+      serializerManager, taskContext, recordComparator, prefixComparator, 
initialSize, numElementsForSpillThreshold,
         pageSizeBytes, inMemorySorter, false /* ignored */);
     sorter.spill(Long.MAX_VALUE, sorter);
     // The external sorter will be used to insert records, in-memory sorter is 
not needed.
@@ -107,9 +116,10 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
+      long numElementsForSpillThreshold,
       boolean canUseRadixSort) {
     return new UnsafeExternalSorter(taskMemoryManager, blockManager, 
serializerManager,
-      taskContext, recordComparator, prefixComparator, initialSize, 
pageSizeBytes, null,
+      taskContext, recordComparator, prefixComparator, initialSize, 
pageSizeBytes, numElementsForSpillThreshold, null,
       canUseRadixSort);
   }
 
@@ -122,6 +132,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
+      long numElementsForSpillThreshold,
       @Nullable UnsafeInMemorySorter existingInMemorySorter,
       boolean canUseRadixSort) {
     super(taskMemoryManager, pageSizeBytes, 
taskMemoryManager.getTungstenMemoryMode());
@@ -143,6 +154,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       this.inMemSorter = existingInMemorySorter;
     }
     this.peakMemoryUsedBytes = getMemoryUsage();
+    this.numElementsForSpillThreshold = numElementsForSpillThreshold;
 
     // Register a cleanup task with TaskContext to ensure that memory is 
guaranteed to be freed at
     // the end of the task. This is necessary to avoid memory leaks in when 
the downstream operator
@@ -373,6 +385,12 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       Object recordBase, long recordOffset, int length, long prefix, boolean 
prefixIsNull)
     throws IOException {
 
+    assert(inMemSorter != null);
+    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+      logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
+      spill();
+    }
+
     growPointerArrayIfNecessary();
     // Need 4 bytes to store the record length.
     final int required = length + 4;
@@ -384,7 +402,6 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
     pageCursor += 4;
     Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
     pageCursor += length;
-    assert(inMemSorter != null);
     inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index bce958c..3ea9923 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -176,6 +176,7 @@ public class UnsafeExternalSorterSuite {
       prefixComparator,
       /* initialSize */ 1024,
       pageSizeBytes,
+      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
       shouldUseRadixSort());
   }
 
@@ -399,6 +400,7 @@ public class UnsafeExternalSorterSuite {
       null,
       /* initialSize */ 1024,
       pageSizeBytes,
+      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
       shouldUseRadixSort());
     long[] record = new long[100];
     int recordSize = record.length * 8;
@@ -435,6 +437,7 @@ public class UnsafeExternalSorterSuite {
       prefixComparator,
       1024,
       pageSizeBytes,
+      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
       shouldUseRadixSort());
 
     // Peak memory should be monotonically increasing. More specifically, 
every time

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 0b177ad..b4e87c3 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -89,6 +89,8 @@ public final class UnsafeExternalRowSorter {
       sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
                              DEFAULT_INITIAL_SORT_BUFFER_SIZE),
       pageSizeBytes,
+      
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
 UnsafeExternalSorter
+          .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
       canUseRadixSort
     );
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 1f1b538..3705291 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.KVIterator;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.map.BytesToBytesMap;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
 
 /**
  * Unsafe-based HashMap for performing aggregations where the aggregated 
values are fixed-width.
@@ -246,6 +247,8 @@ public final class UnsafeFixedWidthAggregationMap {
       SparkEnv.get().blockManager(),
       SparkEnv.get().serializerManager(),
       map.getPageSizeBytes(),
+      
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
 UnsafeExternalSorter
+          .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
       map);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b1cc523..daa948f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -55,8 +55,9 @@ public final class UnsafeKVExternalSorter {
       StructType valueSchema,
       BlockManager blockManager,
       SerializerManager serializerManager,
-      long pageSizeBytes) throws IOException {
-    this(keySchema, valueSchema, blockManager, serializerManager, 
pageSizeBytes, null);
+      long pageSizeBytes,
+      long numElementsForSpillThreshold) throws IOException {
+    this(keySchema, valueSchema, blockManager, serializerManager, 
pageSizeBytes, numElementsForSpillThreshold, null);
   }
 
   public UnsafeKVExternalSorter(
@@ -65,6 +66,7 @@ public final class UnsafeKVExternalSorter {
       BlockManager blockManager,
       SerializerManager serializerManager,
       long pageSizeBytes,
+      long numElementsForSpillThreshold,
       @Nullable BytesToBytesMap map) throws IOException {
     this.keySchema = keySchema;
     this.valueSchema = valueSchema;
@@ -90,6 +92,7 @@ public final class UnsafeKVExternalSorter {
         SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
                                      
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
         pageSizeBytes,
+        numElementsForSpillThreshold,
         canUseRadixSort);
     } else {
       // The array will be used to do in-place sort, which require half of the 
space to be empty.
@@ -136,6 +139,7 @@ public final class UnsafeKVExternalSorter {
         SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
                                      
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
         pageSizeBytes,
+        numElementsForSpillThreshold,
         inMemSorter);
 
       // reset the map, so we can re-use it to insert new records. the 
inMemSorter will not used

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
index 1b9634c..93f007f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
@@ -345,6 +345,8 @@ case class WindowExec(
                   null,
                   1024,
                   SparkEnv.get.memoryManager.pageSizeBytes,
+                  
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+                    
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
                   false)
                 rows.foreach { r =>
                   sorter.insertRecord(r.getBaseObject, r.getBaseOffset, 
r.getSizeInBytes, 0, false)

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f56b50a..9a0b46c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 
 /** A container for all the details required when writing to a table. */
@@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer(
       StructType.fromAttributes(dataColumns),
       SparkEnv.get.blockManager,
       SparkEnv.get.serializerManager,
-      TaskContext.get().taskMemoryManager().pageSizeBytes)
+      TaskContext.get().taskMemoryManager().pageSizeBytes,
+      
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
 
     while (iterator.hasNext) {
       val currentRow = iterator.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index d870d91..0553086 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : 
RDD[UnsafeRow], numField
       null,
       1024,
       SparkEnv.get.memoryManager.pageSizeBytes,
+      
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
       false)
 
     val partition = split.asInstanceOf[CartesianPartition]

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index efb0491..117d667 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
PartitioningUtils}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 object FileStreamSink {
   // The name of the subdirectory that is used to store metadata about which 
files are valid.
@@ -209,7 +210,9 @@ class FileStreamSinkWriter(
       StructType.fromAttributes(writeColumns),
       SparkEnv.get.blockManager,
       SparkEnv.get.serializerManager,
-      TaskContext.get().taskMemoryManager().pageSizeBytes)
+      TaskContext.get().taskMemoryManager().pageSizeBytes,
+      
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
 
     while (iterator.hasNext) {
       val currentRow = iterator.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 03d4be8..3d869c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, 
InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 /**
  * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test 
data.
@@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite 
with SharedSQLContext {
       metricsSystem = null))
 
     val sorter = new UnsafeKVExternalSorter(
-      keySchema, valueSchema, SparkEnv.get.blockManager, 
SparkEnv.get.serializerManager, pageSize)
+      keySchema, valueSchema, SparkEnv.get.blockManager, 
SparkEnv.get.serializerManager,
+      pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)
 
     // Insert the keys and values into the sorter
     inputData.foreach { case (k, v) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 794fe26..e65c24e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 /**
  * Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -280,7 +281,9 @@ private[spark] class 
SparkHiveDynamicPartitionWriterContainer(
         StructType.fromAttributes(dataOutput),
         SparkEnv.get.blockManager,
         SparkEnv.get.serializerManager,
-        TaskContext.get().taskMemoryManager().pageSizeBytes)
+        TaskContext.get().taskMemoryManager().pageSizeBytes,
+        
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
 
       while (iterator.hasNext) {
         val inputRow = iterator.next()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to