Repository: spark
Updated Branches:
  refs/heads/master 9e9fa2f69 -> 8fbc1830f


[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue

JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never 
try to allocate an array that big.  This commit changes some defaults & configs 
to gracefully fallover to something that doesn't require one large array in 
some cases; in other cases it simply improves an error message for cases which 
will still fail.

Closes #22818 from squito/SPARK-25827.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fbc1830
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fbc1830
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fbc1830

Branch: refs/heads/master
Commit: 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc
Parents: 9e9fa2f
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Nov 7 13:18:52 2018 +0100
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Nov 7 13:18:52 2018 +0100

----------------------------------------------------------------------
 .../org/apache/spark/internal/config/package.scala | 17 ++++++++++-------
 .../scala/org/apache/spark/storage/DiskStore.scala |  6 ++++--
 .../apache/spark/storage/memory/MemoryStore.scala  |  7 ++++---
 .../apache/spark/util/io/ChunkedByteBuffer.scala   |  2 +-
 .../org/apache/spark/mllib/linalg/Matrices.scala   | 13 +++++++------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 +++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  7 +++----
 7 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2b3ba3c..d346013 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -416,8 +416,9 @@ package object config {
       .internal()
       .doc("The chunk size in bytes during writing out the bytes of 
ChunkedByteBuffer.")
       .bytesConf(ByteUnit.BYTE)
-      .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the 
bytes of" +
-        " ChunkedByteBuffer should not larger than Int.MaxValue.")
+      .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+        "The chunk size during writing out the bytes of" +
+        " ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
       .createWithDefault(64 * 1024 * 1024)
 
   private[spark] val CHECKPOINT_COMPRESS =
@@ -488,8 +489,9 @@ package object config {
         "otherwise specified. These buffers reduce the number of disk seeks 
and system calls " +
         "made in creating intermediate shuffle files.")
       .bytesConf(ByteUnit.KiB)
-      .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-        s"The file buffer size must be greater than 0 and less than 
${Int.MaxValue / 1024}.")
+      .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+        s"The file buffer size must be greater than 0 and less than" +
+          s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
       .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
@@ -497,8 +499,9 @@ package object config {
       .doc("The file system for this buffer size after each partition " +
         "is written in unsafe shuffle writer. In KiB unless otherwise 
specified.")
       .bytesConf(ByteUnit.KiB)
-      .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
-        s"The buffer size must be greater than 0 and less than ${Int.MaxValue 
/ 1024}.")
+      .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
+        s"The buffer size must be greater than 0 and less than" +
+          s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
       .createWithDefaultString("32k")
 
   private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
@@ -610,7 +613,7 @@ package object config {
       .internal()
       .doc("For testing only, controls the size of chunks when memory mapping 
a file")
       .bytesConf(ByteUnit.BYTE)
-      .createWithDefault(Int.MaxValue)
+      .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
 
   private[spark] val BARRIER_SYNC_TIMEOUT =
     ConfigBuilder("spark.barrier.sync.timeout")

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 841e16a..29963a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -33,6 +33,7 @@ import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
 import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBuffer
 
@@ -217,7 +218,7 @@ private[spark] class EncryptedBlockData(
       var remaining = blockSize
       val chunks = new ListBuffer[ByteBuffer]()
       while (remaining > 0) {
-        val chunkSize = math.min(remaining, Int.MaxValue)
+        val chunkSize = math.min(remaining, 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
         val chunk = allocator(chunkSize.toInt)
         remaining -= chunkSize
         JavaUtils.readFully(source, chunk)
@@ -235,7 +236,8 @@ private[spark] class EncryptedBlockData(
     // This is used by the block transfer service to replicate blocks. The 
upload code reads
     // all bytes into memory to send the block to the remote executor, so it's 
ok to do this
     // as long as the block fits in a Java array.
-    assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a 
byte buffer.")
+    assert(blockSize <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+      "Block is too large to be wrapped in a byte buffer.")
     val dst = ByteBuffer.allocate(blockSize.toInt)
     val in = open()
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 06fd56e..8513359 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -34,6 +34,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage._
 import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
@@ -333,11 +334,11 @@ private[spark] class MemoryStore(
 
     // Initial per-task memory to request for unrolling blocks (bytes).
     val initialMemoryThreshold = unrollMemoryThreshold
-    val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
+    val chunkSize = if (initialMemoryThreshold > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
       logWarning(s"Initial memory threshold of 
${Utils.bytesToString(initialMemoryThreshold)} " +
         s"is too large to be set as chunk size. Chunk size has been capped to 
" +
-        s"${Utils.bytesToString(Int.MaxValue)}")
-      Int.MaxValue
+        s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}")
+      ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
     } else {
       initialMemoryThreshold.toInt
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index da2be84..870830f 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -97,7 +97,7 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
    * @throws UnsupportedOperationException if this buffer's size exceeds the 
maximum array size.
    */
   def toArray: Array[Byte] = {
-    if (size >= Integer.MAX_VALUE) {
+    if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
       throw new UnsupportedOperationException(
         s"cannot call toArray because buffer size ($size bytes) exceeds 
maximum array size")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index bf9b4cf..e474cfa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -30,6 +30,7 @@ import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeArrayData}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.array.ByteArrayMethods
 
 /**
  * Trait for a local matrix.
@@ -456,7 +457,7 @@ object DenseMatrix {
    */
   @Since("1.3.0")
   def zeros(numRows: Int, numCols: Int): DenseMatrix = {
-    require(numRows.toLong * numCols <= Int.MaxValue,
+    require(numRows.toLong * numCols <= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
             s"$numRows x $numCols dense matrix is too large to allocate")
     new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols))
   }
@@ -469,7 +470,7 @@ object DenseMatrix {
    */
   @Since("1.3.0")
   def ones(numRows: Int, numCols: Int): DenseMatrix = {
-    require(numRows.toLong * numCols <= Int.MaxValue,
+    require(numRows.toLong * numCols <= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
             s"$numRows x $numCols dense matrix is too large to allocate")
     new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0))
   }
@@ -499,7 +500,7 @@ object DenseMatrix {
    */
   @Since("1.3.0")
   def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
-    require(numRows.toLong * numCols <= Int.MaxValue,
+    require(numRows.toLong * numCols <= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
             s"$numRows x $numCols dense matrix is too large to allocate")
     new DenseMatrix(numRows, numCols, Array.fill(numRows * 
numCols)(rng.nextDouble()))
   }
@@ -513,7 +514,7 @@ object DenseMatrix {
    */
   @Since("1.3.0")
   def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
-    require(numRows.toLong * numCols <= Int.MaxValue,
+    require(numRows.toLong * numCols <= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
             s"$numRows x $numCols dense matrix is too large to allocate")
     new DenseMatrix(numRows, numCols, Array.fill(numRows * 
numCols)(rng.nextGaussian()))
   }
@@ -846,8 +847,8 @@ object SparseMatrix {
       s"density must be a double in the range 0.0 <= d <= 1.0. Currently, 
density: $density")
     val size = numRows.toLong * numCols
     val expected = size * density
-    assert(expected < Int.MaxValue,
-      "The expected number of nonzeros cannot be greater than Int.MaxValue.")
+    assert(expected < ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+      "The expected number of nonzeros cannot be greater than Int.MaxValue - 
15.")
     val nnz = math.ceil(expected).toInt
     if (density == 0.0) {
       new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), 
Array.empty, Array.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fa59fa5..518115d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -27,7 +27,6 @@ import scala.collection.immutable
 import scala.util.matching.Regex
 
 import org.apache.hadoop.fs.Path
-import org.tukaani.xz.LZMA2Options
 
 import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
@@ -36,6 +35,7 @@ import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1246,7 +1246,7 @@ object SQLConf {
       .doc("Threshold for number of rows guaranteed to be held in memory by 
the sort merge " +
         "join operator")
       .intConf
-      .createWithDefault(Int.MaxValue)
+      .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
 
   val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
     buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
@@ -1480,7 +1480,7 @@ object SQLConf {
           "'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, 
do a top-K sort" +
           " in memory, otherwise do a global sort which spills to disk if 
necessary.")
       .intConf
-      .createWithDefault(Int.MaxValue)
+      .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"

http://git-wip-us.apache.org/repos/asf/spark/blob/8fbc1830/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c91b0d7..d534005 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql
 
 import java.io.CharArrayWriter
-import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -46,7 +45,6 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, 
ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
PartitioningCollection}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
 import org.apache.spark.sql.execution.command._
@@ -57,6 +55,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.Utils
 
@@ -287,7 +286,7 @@ class Dataset[T] private[sql](
       _numRows: Int,
       truncate: Int = 20,
       vertical: Boolean = false): String = {
-    val numRows = _numRows.max(0).min(Int.MaxValue - 1)
+    val numRows = 
_numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
     // Get rows represented by Seq[Seq[String]], we may get one more line if 
it has more data.
     val tmpRows = getRows(numRows, truncate)
 
@@ -3264,7 +3263,7 @@ class Dataset[T] private[sql](
       _numRows: Int,
       truncate: Int): Array[Any] = {
     EvaluatePython.registerPicklers()
-    val numRows = _numRows.max(0).min(Int.MaxValue - 1)
+    val numRows = 
_numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
     val rows = getRows(numRows, truncate).map(_.toArray).toArray
     val toJava: (Any) => Any = EvaluatePython.toJava(_, 
ArrayType(ArrayType(StringType)))
     val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to