Repository: spark
Updated Branches:
  refs/heads/branch-2.4 544f86a69 -> f91247f81


[SPARK-25422][CORE] Don't memory map blocks streamed to disk.

After data has been streamed to disk, the buffers are inserted into the
memory store in some cases (eg., with broadcast blocks).  But broadcast
code also disposes of those buffers when the data has been read, to
ensure that we don't leave mapped buffers using up memory, which then
leads to garbage data in the memory store.

## How was this patch tested?

Ran the old failing test in a loop. Full tests on jenkins

Closes #22546 from squito/SPARK-25422-master.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>
(cherry picked from commit 9bb3a0c67bd851b09ff4701ef1d280e2a77d791b)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f91247f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f91247f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f91247f8

Branch: refs/heads/branch-2.4
Commit: f91247f812f87daa9fe4ec23b100f2310254df22
Parents: 544f86a
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Sep 26 08:45:27 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Sep 26 08:45:56 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 13 +++---
 .../spark/util/io/ChunkedByteBuffer.scala       | 47 +++++++++++---------
 2 files changed, 31 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2234146..0fe82ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -438,10 +438,8 @@ private[spark] class BlockManager(
         // stream.
         channel.close()
         // TODO SPARK-25035 Even if we're only going to write the data to disk 
after this, we end up
-        // using a lot of memory here.  With encryption, we'll read the whole 
file into a regular
-        // byte buffer and OOM.  Without encryption, we'll memory map the file 
and won't get a jvm
-        // OOM, but might get killed by the OS / cluster manager.  We could at 
least read the tmp
-        // file as a stream in both cases.
+        // using a lot of memory here. We'll read the whole file into a regular
+        // byte buffer and OOM.  We could at least read the tmp file as a 
stream.
         val buffer = securityManager.getIOEncryptionKey() match {
           case Some(key) =>
             // we need to pass in the size of the unencrypted block
@@ -453,7 +451,7 @@ private[spark] class BlockManager(
             new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
 
           case None =>
-            ChunkedByteBuffer.map(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
+            ChunkedByteBuffer.fromFile(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
         }
         putBytes(blockId, buffer, level)(classTag)
         tmpFile.delete()
@@ -726,10 +724,9 @@ private[spark] class BlockManager(
    */
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     // TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
-    // could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+    // could just use the inputStream on the temp file, rather than reading 
the file into memory.
     // Until then, replication can cause the process to use too much memory 
and get killed
-    // by the OS / cluster manager (not a java OOM, since it's a memory-mapped 
file) even though
-    // we've read the data to disk.
+    // even though we've read the data to disk.
     logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     var runningFailureCount = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 39f050f..4aa8d45 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -19,17 +19,16 @@ package org.apache.spark.util.io
 
 import java.io.{File, FileInputStream, InputStream}
 import java.nio.ByteBuffer
-import java.nio.channels.{FileChannel, WritableByteChannel}
-import java.nio.file.StandardOpenOption
-
-import scala.collection.mutable.ListBuffer
+import java.nio.channels.WritableByteChannel
 
+import com.google.common.io.ByteStreams
 import com.google.common.primitives.UnsignedBytes
+import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.config
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
-import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.network.util.{ByteArrayWritableChannel, 
LimitedInputStream}
 import org.apache.spark.storage.StorageUtils
 import org.apache.spark.util.Utils
 
@@ -175,30 +174,36 @@ object ChunkedByteBuffer {
   def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
     data match {
       case f: FileSegmentManagedBuffer =>
-        map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+        fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength)
       case other =>
         new ChunkedByteBuffer(other.nioByteBuffer())
     }
   }
 
-  def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
-    map(file, maxChunkSize, 0, file.length())
+  def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
+    fromFile(file, maxChunkSize, 0, file.length())
   }
 
-  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
-    Utils.tryWithResource(FileChannel.open(file.toPath, 
StandardOpenOption.READ)) { channel =>
-      var remaining = length
-      var pos = offset
-      val chunks = new ListBuffer[ByteBuffer]()
-      while (remaining > 0) {
-        val chunkSize = math.min(remaining, maxChunkSize)
-        val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize)
-        pos += chunkSize
-        remaining -= chunkSize
-        chunks += chunk
-      }
-      new ChunkedByteBuffer(chunks.toArray)
+  private def fromFile(
+      file: File,
+      maxChunkSize: Int,
+      offset: Long,
+      length: Long): ChunkedByteBuffer = {
+    // We do *not* memory map the file, because we may end up putting this 
into the memory store,
+    // and spark currently is not expecting memory-mapped buffers in the 
memory store, it conflicts
+    // with other parts that manage the lifecyle of buffers and dispose them.  
See SPARK-25422.
+    val is = new FileInputStream(file)
+    ByteStreams.skipFully(is, offset)
+    val in = new LimitedInputStream(is, length)
+    val chunkSize = math.min(maxChunkSize, length).toInt
+    val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate 
_)
+    Utils.tryWithSafeFinally {
+      IOUtils.copy(in, out)
+    } {
+      in.close()
+      out.close()
     }
+    out.toChunkedByteBuffer
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to