spark git commit: [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue
Repository: spark Updated Branches: refs/heads/branch-2.4 947462f5a -> 47a668c2f [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never try to allocate an array that big. This commit changes some defaults & configs to gracefully fallover to something that doesn't require one large array in some cases; in other cases it simply improves an error message for cases which will still fail. Closes #22818 from squito/SPARK-25827. Authored-by: Imran Rashid Signed-off-by: Imran Rashid (cherry picked from commit 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a668c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a668c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a668c2 Branch: refs/heads/branch-2.4 Commit: 47a668c2f03d77078259531ddaccf80b001a8b5c Parents: 947462f Author: Imran Rashid Authored: Wed Nov 7 13:18:52 2018 +0100 Committer: Imran Rashid Committed: Thu Nov 8 15:49:19 2018 +0100 -- .../org/apache/spark/internal/config/package.scala | 17 ++--- .../scala/org/apache/spark/storage/DiskStore.scala | 6 -- .../apache/spark/storage/memory/MemoryStore.scala | 7 --- .../apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- .../org/apache/spark/mllib/linalg/Matrices.scala | 13 +++-- .../org/apache/spark/sql/internal/SQLConf.scala| 6 +++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 7 +++ 7 files changed, 32 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5836d27..e2162db 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -387,8 +387,9 @@ package object config { .internal() .doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.") .bytesConf(ByteUnit.BYTE) - .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + -" ChunkedByteBuffer should not larger than Int.MaxValue.") + .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, +"The chunk size during writing out the bytes of" + +" ChunkedByteBuffer should not larger than Int.MaxValue - 15.") .createWithDefault(64 * 1024 * 1024) private[spark] val CHECKPOINT_COMPRESS = @@ -459,8 +460,9 @@ package object config { "otherwise specified. These buffers reduce the number of disk seeks and system calls " + "made in creating intermediate shuffle files.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, -s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, +s"The file buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = @@ -468,8 +470,9 @@ package object config { .doc("The file system for this buffer size after each partition " + "is written in unsafe shuffle writer. In KiB unless otherwise specified.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, -s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, +s"The buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = @@ -580,7 +583,7 @@ package object config { .internal() .doc("For testing only, controls the size of chunks when memory mapping a file") .bytesConf(ByteUnit.BYTE) - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) private[spark] val BARRIER_SYNC_TIMEOUT = ConfigBuilder("spark.barrier.sync.timeout") http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/
spark git commit: [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue
Repository: spark Updated Branches: refs/heads/master 9e9fa2f69 -> 8fbc1830f [SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never try to allocate an array that big. This commit changes some defaults & configs to gracefully fallover to something that doesn't require one large array in some cases; in other cases it simply improves an error message for cases which will still fail. Closes #22818 from squito/SPARK-25827. Authored-by: Imran Rashid Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fbc1830 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fbc1830 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fbc1830 Branch: refs/heads/master Commit: 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc Parents: 9e9fa2f Author: Imran Rashid Authored: Wed Nov 7 13:18:52 2018 +0100 Committer: Imran Rashid Committed: Wed Nov 7 13:18:52 2018 +0100 -- .../org/apache/spark/internal/config/package.scala | 17 ++--- .../scala/org/apache/spark/storage/DiskStore.scala | 6 -- .../apache/spark/storage/memory/MemoryStore.scala | 7 --- .../apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- .../org/apache/spark/mllib/linalg/Matrices.scala | 13 +++-- .../org/apache/spark/sql/internal/SQLConf.scala| 6 +++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 7 +++ 7 files changed, 32 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2b3ba3c..d346013 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -416,8 +416,9 @@ package object config { .internal() .doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.") .bytesConf(ByteUnit.BYTE) - .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + -" ChunkedByteBuffer should not larger than Int.MaxValue.") + .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, +"The chunk size during writing out the bytes of" + +" ChunkedByteBuffer should not larger than Int.MaxValue - 15.") .createWithDefault(64 * 1024 * 1024) private[spark] val CHECKPOINT_COMPRESS = @@ -488,8 +489,9 @@ package object config { "otherwise specified. These buffers reduce the number of disk seeks and system calls " + "made in creating intermediate shuffle files.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, -s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, +s"The file buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = @@ -497,8 +499,9 @@ package object config { .doc("The file system for this buffer size after each partition " + "is written in unsafe shuffle writer. In KiB unless otherwise specified.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, -s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, +s"The buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = @@ -610,7 +613,7 @@ package object config { .internal() .doc("For testing only, controls the size of chunks when memory mapping a file") .bytesConf(ByteUnit.BYTE) - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) private[spark] val BARRIER_SYNC_TIMEOUT = ConfigBuilder("spark.barrier.sync.timeout") http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/