Repository: spark
Updated Branches:
  refs/heads/branch-2.4 9c0c6d4d5 -> 1001d2314


[SPARK-25704][CORE] Allocate a bit less than Int.MaxValue

JVMs don't you allocate arrays of length exactly Int.MaxValue, so leave
a little extra room.  This is necessary when reading blocks >2GB off
the network (for remote reads or for cache replication).

Unit tests via jenkins, ran a test with blocks over 2gb on a cluster

Closes #22705 from squito/SPARK-25704.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1001d231
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1001d231
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1001d231

Branch: refs/heads/branch-2.4
Commit: 1001d2314275c902da519725da266a23b537e33a
Parents: 9c0c6d4
Author: Imran Rashid <iras...@cloudera.com>
Authored: Fri Oct 19 12:52:41 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Fri Oct 19 12:54:08 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala     |  6 ++----
 .../apache/spark/util/io/ChunkedByteBuffer.scala    | 16 +++++++++-------
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1001d231/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0fe82ac..c01a453 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -133,8 +133,6 @@ private[spark] class BlockManager(
 
   private[spark] val externalShuffleServiceEnabled =
     conf.get(config.SHUFFLE_SERVICE_ENABLED)
-  private val chunkSize =
-    conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", 
Int.MaxValue.toString).toInt
   private val remoteReadNioBufferConversion =
     conf.getBoolean("spark.network.remoteReadNioBufferConversion", false)
 
@@ -451,7 +449,7 @@ private[spark] class BlockManager(
             new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
 
           case None =>
-            ChunkedByteBuffer.fromFile(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
+            ChunkedByteBuffer.fromFile(tmpFile)
         }
         putBytes(blockId, buffer, level)(classTag)
         tmpFile.delete()
@@ -797,7 +795,7 @@ private[spark] class BlockManager(
         if (remoteReadNioBufferConversion) {
           return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
         } else {
-          return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
+          return Some(ChunkedByteBuffer.fromManagedBuffer(data))
         }
       }
       logDebug(s"The value of block $blockId is null")

http://git-wip-us.apache.org/repos/asf/spark/blob/1001d231/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 4aa8d45..9547cb4 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.util.{ByteArrayWritableChannel, 
LimitedInputStream}
 import org.apache.spark.storage.StorageUtils
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 
 /**
@@ -169,24 +170,25 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
-object ChunkedByteBuffer {
+private[spark] object ChunkedByteBuffer {
+
+
   // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
-  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+  def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
     data match {
       case f: FileSegmentManagedBuffer =>
-        fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+        fromFile(f.getFile, f.getOffset, f.getLength)
       case other =>
         new ChunkedByteBuffer(other.nioByteBuffer())
     }
   }
 
-  def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
-    fromFile(file, maxChunkSize, 0, file.length())
+  def fromFile(file: File): ChunkedByteBuffer = {
+    fromFile(file, 0, file.length())
   }
 
   private def fromFile(
       file: File,
-      maxChunkSize: Int,
       offset: Long,
       length: Long): ChunkedByteBuffer = {
     // We do *not* memory map the file, because we may end up putting this 
into the memory store,
@@ -195,7 +197,7 @@ object ChunkedByteBuffer {
     val is = new FileInputStream(file)
     ByteStreams.skipFully(is, offset)
     val in = new LimitedInputStream(is, length)
-    val chunkSize = math.min(maxChunkSize, length).toInt
+    val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, 
length).toInt
     val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate 
_)
     Utils.tryWithSafeFinally {
       IOUtils.copy(in, out)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to