spark git commit: [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue

2018-11-08 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 947462f5a -> 47a668c2f


[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue

JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never 
try to allocate an array that big.  This commit changes some defaults & configs 
to gracefully fallover to something that doesn't require one large array in 
some cases; in other cases it simply improves an error message for cases which 
will still fail.

Closes #22818 from squito/SPARK-25827.

Authored-by: Imran Rashid 
Signed-off-by: Imran Rashid 
(cherry picked from commit 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a668c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a668c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a668c2

Branch: refs/heads/branch-2.4
Commit: 47a668c2f03d77078259531ddaccf80b001a8b5c
Parents: 947462f
Author: Imran Rashid 
Authored: Wed Nov 7 13:18:52 2018 +0100
Committer: Imran Rashid 
Committed: Thu Nov 8 15:49:19 2018 +0100

--
 .../org/apache/spark/internal/config/package.scala | 17 ++---
 .../scala/org/apache/spark/storage/DiskStore.scala |  6 --
 .../apache/spark/storage/memory/MemoryStore.scala  |  7 ---
 .../apache/spark/util/io/ChunkedByteBuffer.scala   |  2 +-
 .../org/apache/spark/mllib/linalg/Matrices.scala   | 13 +++--
 .../org/apache/spark/sql/internal/SQLConf.scala|  6 +++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  7 +++
 7 files changed, 32 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5836d27..e2162db 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -387,8 +387,9 @@ package object config {
   .internal()
   .doc("The chunk size in bytes during writing out the bytes of 
ChunkedByteBuffer.")
   .bytesConf(ByteUnit.BYTE)
-  .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the 
bytes of" +
-" ChunkedByteBuffer should not larger than Int.MaxValue.")
+  .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+"The chunk size during writing out the bytes of" +
+" ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
   .createWithDefault(64 * 1024 * 1024)
 
   private[spark] val CHECKPOINT_COMPRESS =
@@ -459,8 +460,9 @@ package object config {
 "otherwise specified. These buffers reduce the number of disk seeks 
and system calls " +
 "made in creating intermediate shuffle files.")
   .bytesConf(ByteUnit.KiB)
-  .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-s"The file buffer size must be greater than 0 and less than 
${Int.MaxValue / 1024}.")
+  .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+s"The file buffer size must be greater than 0 and less than" +
+  s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
   .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
@@ -468,8 +470,9 @@ package object config {
   .doc("The file system for this buffer size after each partition " +
 "is written in unsafe shuffle writer. In KiB unless otherwise 
specified.")
   .bytesConf(ByteUnit.KiB)
-  .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-s"The buffer size must be greater than 0 and less than ${Int.MaxValue 
/ 1024}.")
+  .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+s"The buffer size must be greater than 0 and less than" +
+  s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
   .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
@@ -580,7 +583,7 @@ package object config {
   .internal()
   .doc("For testing only, controls the size of chunks when memory mapping 
a file")
   .bytesConf(ByteUnit.BYTE)
-  .createWithDefault(Int.MaxValue)
+  .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
 
   private[spark] val BARRIER_SYNC_TIMEOUT =
 ConfigBuilder("spark.barrier.sync.timeout")

http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git 

spark git commit: [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue

2018-11-07 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 9e9fa2f69 -> 8fbc1830f


[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue

JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never 
try to allocate an array that big.  This commit changes some defaults & configs 
to gracefully fallover to something that doesn't require one large array in 
some cases; in other cases it simply improves an error message for cases which 
will still fail.

Closes #22818 from squito/SPARK-25827.

Authored-by: Imran Rashid 
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fbc1830
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fbc1830
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fbc1830

Branch: refs/heads/master
Commit: 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc
Parents: 9e9fa2f
Author: Imran Rashid 
Authored: Wed Nov 7 13:18:52 2018 +0100
Committer: Imran Rashid 
Committed: Wed Nov 7 13:18:52 2018 +0100

--
 .../org/apache/spark/internal/config/package.scala | 17 ++---
 .../scala/org/apache/spark/storage/DiskStore.scala |  6 --
 .../apache/spark/storage/memory/MemoryStore.scala  |  7 ---
 .../apache/spark/util/io/ChunkedByteBuffer.scala   |  2 +-
 .../org/apache/spark/mllib/linalg/Matrices.scala   | 13 +++--
 .../org/apache/spark/sql/internal/SQLConf.scala|  6 +++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  7 +++
 7 files changed, 32 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2b3ba3c..d346013 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -416,8 +416,9 @@ package object config {
   .internal()
   .doc("The chunk size in bytes during writing out the bytes of 
ChunkedByteBuffer.")
   .bytesConf(ByteUnit.BYTE)
-  .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the 
bytes of" +
-" ChunkedByteBuffer should not larger than Int.MaxValue.")
+  .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+"The chunk size during writing out the bytes of" +
+" ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
   .createWithDefault(64 * 1024 * 1024)
 
   private[spark] val CHECKPOINT_COMPRESS =
@@ -488,8 +489,9 @@ package object config {
 "otherwise specified. These buffers reduce the number of disk seeks 
and system calls " +
 "made in creating intermediate shuffle files.")
   .bytesConf(ByteUnit.KiB)
-  .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-s"The file buffer size must be greater than 0 and less than 
${Int.MaxValue / 1024}.")
+  .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+s"The file buffer size must be greater than 0 and less than" +
+  s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
   .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
@@ -497,8 +499,9 @@ package object config {
   .doc("The file system for this buffer size after each partition " +
 "is written in unsafe shuffle writer. In KiB unless otherwise 
specified.")
   .bytesConf(ByteUnit.KiB)
-  .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-s"The buffer size must be greater than 0 and less than ${Int.MaxValue 
/ 1024}.")
+  .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+s"The buffer size must be greater than 0 and less than" +
+  s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
   .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
@@ -610,7 +613,7 @@ package object config {
   .internal()
   .doc("For testing only, controls the size of chunks when memory mapping 
a file")
   .bytesConf(ByteUnit.BYTE)
-  .createWithDefault(Int.MaxValue)
+  .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
 
   private[spark] val BARRIER_SYNC_TIMEOUT =
 ConfigBuilder("spark.barrier.sync.timeout")

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala