spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…

2016-06-30 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 98056a1f8 -> f17ffef38


[SPARK-13850] Force the sorter to Spill when number of elements in th…

Force the sorter to Spill when number of elements in the pointer array reach a 
certain size. This is to workaround the issue of timSort failing on large 
buffer size.

Tested by running a job which was failing without this change due to TimSort 
bug.

Author: Sital Kedia 

Closes #13107 from sitalkedia/fix_TimSort.

(cherry picked from commit 07f46afc733b1718d528a6ea5c0d774f047024fa)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f17ffef3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f17ffef3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f17ffef3

Branch: refs/heads/branch-2.0
Commit: f17ffef38b4749b6b801c198ec207434a4db0c38
Parents: 98056a1
Author: Sital Kedia 
Authored: Thu Jun 30 10:53:18 2016 -0700
Committer: Davies Liu 
Committed: Thu Jun 30 10:54:37 2016 -0700

--
 .../shuffle/sort/ShuffleExternalSorter.java | 10 ++---
 .../unsafe/sort/UnsafeExternalSorter.java   | 23 +---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  3 +++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 ++
 .../UnsafeFixedWidthAggregationMap.java |  3 +++
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +--
 .../apache/spark/sql/execution/WindowExec.scala |  2 ++
 .../execution/datasources/WriterContainer.scala |  5 -
 .../execution/joins/CartesianProductExec.scala  |  2 ++
 .../execution/streaming/FileStreamSink.scala|  5 -
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 +++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  5 -
 12 files changed, 60 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef8..696ee73 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final TaskContext taskContext;
   private final ShuffleWriteMetrics writeMetrics;
 
-  /** Force this sorter to spill when there are this many elements in memory. 
For testing only */
+  /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   */
   private final long numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
 this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 this.numElementsForSpillThreshold =
-  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 
Long.MAX_VALUE);
+  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
 this.writeMetrics = writeMetrics;
 this.inMemSorter = new ShuffleInMemorySorter(
   this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
 // for tests
 assert(inMemSorter != null);
-if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+  logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
   spill();
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e14a23f..8a980d4 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4

spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…

2016-06-30 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 5344bade8 -> 07f46afc7


[SPARK-13850] Force the sorter to Spill when number of elements in th…

## What changes were proposed in this pull request?

Force the sorter to Spill when number of elements in the pointer array reach a 
certain size. This is to workaround the issue of timSort failing on large 
buffer size.

## How was this patch tested?

Tested by running a job which was failing without this change due to TimSort 
bug.

Author: Sital Kedia 

Closes #13107 from sitalkedia/fix_TimSort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f46afc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f46afc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f46afc

Branch: refs/heads/master
Commit: 07f46afc733b1718d528a6ea5c0d774f047024fa
Parents: 5344bad
Author: Sital Kedia 
Authored: Thu Jun 30 10:53:18 2016 -0700
Committer: Davies Liu 
Committed: Thu Jun 30 10:53:18 2016 -0700

--
 .../shuffle/sort/ShuffleExternalSorter.java | 10 ++---
 .../unsafe/sort/UnsafeExternalSorter.java   | 23 +---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  3 +++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 ++
 .../UnsafeFixedWidthAggregationMap.java |  3 +++
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +--
 .../apache/spark/sql/execution/WindowExec.scala |  2 ++
 .../execution/datasources/WriterContainer.scala |  5 -
 .../execution/joins/CartesianProductExec.scala  |  2 ++
 .../execution/streaming/FileStreamSink.scala|  5 -
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 +++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  5 -
 12 files changed, 60 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef8..696ee73 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final TaskContext taskContext;
   private final ShuffleWriteMetrics writeMetrics;
 
-  /** Force this sorter to spill when there are this many elements in memory. 
For testing only */
+  /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   */
   private final long numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
 this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 this.numElementsForSpillThreshold =
-  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 
Long.MAX_VALUE);
+  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
 this.writeMetrics = writeMetrics;
 this.inMemSorter = new ShuffleInMemorySorter(
   this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
 // for tests
 assert(inMemSorter != null);
-if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+  logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
   spill();
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ec15f0b..d6a255e 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org