Repository: spark Updated Branches: refs/heads/branch-2.4 947462f5a -> 47a668c2f
[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never try to allocate an array that big. This commit changes some defaults & configs to gracefully fallover to something that doesn't require one large array in some cases; in other cases it simply improves an error message for cases which will still fail. Closes #22818 from squito/SPARK-25827. Authored-by: Imran Rashid <iras...@cloudera.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> (cherry picked from commit 8fbc1830f962c446b915d0d8ff2b13c5c75d22fc) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a668c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a668c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a668c2 Branch: refs/heads/branch-2.4 Commit: 47a668c2f03d77078259531ddaccf80b001a8b5c Parents: 947462f Author: Imran Rashid <iras...@cloudera.com> Authored: Wed Nov 7 13:18:52 2018 +0100 Committer: Imran Rashid <iras...@cloudera.com> Committed: Thu Nov 8 15:49:19 2018 +0100 ---------------------------------------------------------------------- .../org/apache/spark/internal/config/package.scala | 17 ++++++++++------- .../scala/org/apache/spark/storage/DiskStore.scala | 6 ++++-- .../apache/spark/storage/memory/MemoryStore.scala | 7 ++++--- .../apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- .../org/apache/spark/mllib/linalg/Matrices.scala | 13 +++++++------ .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 7 +++---- 7 files changed, 32 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5836d27..e2162db 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -387,8 +387,9 @@ package object config { .internal() .doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.") .bytesConf(ByteUnit.BYTE) - .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + - " ChunkedByteBuffer should not larger than Int.MaxValue.") + .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "The chunk size during writing out the bytes of" + + " ChunkedByteBuffer should not larger than Int.MaxValue - 15.") .createWithDefault(64 * 1024 * 1024) private[spark] val CHECKPOINT_COMPRESS = @@ -459,8 +460,9 @@ package object config { "otherwise specified. These buffers reduce the number of disk seeks and system calls " + "made in creating intermediate shuffle files.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, - s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, + s"The file buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = @@ -468,8 +470,9 @@ package object config { .doc("The file system for this buffer size after each partition " + "is written in unsafe shuffle writer. In KiB unless otherwise specified.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, - s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, + s"The buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = @@ -580,7 +583,7 @@ package object config { .internal() .doc("For testing only, controls the size of chunks when memory mapping a file") .bytesConf(ByteUnit.BYTE) - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) private[spark] val BARRIER_SYNC_TIMEOUT = ConfigBuilder("spark.barrier.sync.timeout") http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 841e16a..29963a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer @@ -217,7 +218,7 @@ private[spark] class EncryptedBlockData( var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { - val chunkSize = math.min(remaining, Int.MaxValue) + val chunkSize = math.min(remaining, ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val chunk = allocator(chunkSize.toInt) remaining -= chunkSize JavaUtils.readFully(source, chunk) @@ -235,7 +236,8 @@ private[spark] class EncryptedBlockData( // This is used by the block transfer service to replicate blocks. The upload code reads // all bytes into memory to send the block to the remote executor, so it's ok to do this // as long as the block fits in a Java array. - assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.") + assert(blockSize <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "Block is too large to be wrapped in a byte buffer.") val dst = ByteBuffer.allocate(blockSize.toInt) val in = open() try { http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 06fd56e..8513359 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -34,6 +34,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage._ import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -333,11 +334,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold - val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { + val chunkSize = if (initialMemoryThreshold > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + s"is too large to be set as chunk size. Chunk size has been capped to " + - s"${Utils.bytesToString(Int.MaxValue)}") - Int.MaxValue + s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}") + ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH } else { initialMemoryThreshold.toInt } http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index da2be84..870830f 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -97,7 +97,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. */ def toArray: Array[Byte] = { - if (size >= Integer.MAX_VALUE) { + if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { throw new UnsupportedOperationException( s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") } http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index bf9b4cf..e474cfa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -30,6 +30,7 @@ import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.array.ByteArrayMethods /** * Trait for a local matrix. @@ -456,7 +457,7 @@ object DenseMatrix { */ @Since("1.3.0") def zeros(numRows: Int, numCols: Int): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols)) } @@ -469,7 +470,7 @@ object DenseMatrix { */ @Since("1.3.0") def ones(numRows: Int, numCols: Int): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0)) } @@ -499,7 +500,7 @@ object DenseMatrix { */ @Since("1.3.0") def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextDouble())) } @@ -513,7 +514,7 @@ object DenseMatrix { */ @Since("1.3.0") def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextGaussian())) } @@ -846,8 +847,8 @@ object SparseMatrix { s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density") val size = numRows.toLong * numCols val expected = size * density - assert(expected < Int.MaxValue, - "The expected number of nonzeros cannot be greater than Int.MaxValue.") + assert(expected < ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "The expected number of nonzeros cannot be greater than Int.MaxValue - 15.") val nnz = math.ceil(expected).toInt if (density == 0.0) { new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty) http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 08def90..99e601a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -27,7 +27,6 @@ import scala.collection.immutable import scala.util.matching.Regex import org.apache.hadoop.fs.Path -import org.tukaani.xz.LZMA2Options import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging @@ -36,6 +35,7 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1208,7 +1208,7 @@ object SQLConf { .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + "join operator") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") @@ -1442,7 +1442,7 @@ object SQLConf { "'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" + " in memory, otherwise do a global sort which spills to disk if necessary.") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" http://git-wip-us.apache.org/repos/asf/spark/blob/47a668c2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fa14aa1..13e6a83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter -import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -46,7 +45,6 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ @@ -57,6 +55,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.storage.StorageLevel +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.Utils @@ -287,7 +286,7 @@ class Dataset[T] private[sql]( _numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = { - val numRows = _numRows.max(0).min(Int.MaxValue - 1) + val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1) // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. val tmpRows = getRows(numRows, truncate) @@ -3264,7 +3263,7 @@ class Dataset[T] private[sql]( _numRows: Int, truncate: Int): Array[Any] = { EvaluatePython.registerPicklers() - val numRows = _numRows.max(0).min(Int.MaxValue - 1) + val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1) val rows = getRows(numRows, truncate).map(_.toArray).toArray val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org