Repository: spark
Updated Branches:
  refs/heads/master cb368f2c2 -> 59741887e


[SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion to 
a ChunkedByteBuffer

## What changes were proposed in this pull request?

In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling 
`getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in 
turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`.
Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just 
get this `ManagedBuffer` and use its `InputStream`.
When reading a remote cache block from disk, this reduces heap memory usage 
significantly.
Retain `getRemoteBytes` for other callers.

## How was this patch tested?

Imran Rashid wrote an application 
(https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala),
 that among other things, tests reading remote cache blocks. I ran this 
application, using 2500MB blocks, to test reading a cache block on disk. 
Without this change, with `--executor-memory 5g`, the test fails with 
`java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes 
with `--executor-memory 2g`.
I also ran the unit tests in core. In particular, `DistributedSuite` has a set 
of tests that exercise the `getRemoteValues` code path. `BlockManagerSuite` has 
several tests that call `getRemoteBytes`; I left these unchanged, so 
`getRemoteBytes` still gets exercised.

Closes #23058 from wypoon/SPARK-25905.

Authored-by: Wing Yew Poon <wyp...@cloudera.com>
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59741887
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59741887
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59741887

Branch: refs/heads/master
Commit: 59741887e272be92ebd6e61783f99f7d8fc05456
Parents: cb368f2
Author: Wing Yew Poon <wyp...@cloudera.com>
Authored: Thu Nov 29 14:56:34 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu Nov 29 14:56:34 2018 -0600

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 43 ++++++++++++--------
 .../spark/util/io/ChunkedByteBuffer.scala       |  2 -
 .../org/apache/spark/DistributedSuite.scala     |  2 +-
 3 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1b61729..1dfbc6e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -692,9 +692,9 @@ private[spark] class BlockManager(
    */
   private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
     val ct = implicitly[ClassTag[T]]
-    getRemoteBytes(blockId).map { data =>
+    getRemoteManagedBuffer(blockId).map { data =>
       val values =
-        serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
+        serializerManager.dataDeserializeStream(blockId, 
data.createInputStream())(ct)
       new BlockResult(values, DataReadMethod.Network, data.size)
     }
   }
@@ -717,13 +717,9 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from remote block managers as serialized bytes.
+   * Get block from remote block managers as a ManagedBuffer.
    */
-  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
-    // TODO SPARK-25905 if we change this method to return the ManagedBuffer, 
then getRemoteValues
-    // could just use the inputStream on the temp file, rather than reading 
the file into memory.
-    // Until then, replication can cause the process to use too much memory 
and get killed
-    // even though we've read the data to disk.
+  private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] 
= {
     logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     var runningFailureCount = 0
@@ -788,14 +784,13 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        // SPARK-24307 undocumented "escape-hatch" in case there are any 
issues in converting to
-        // ChunkedByteBuffer, to go back to old code-path.  Can be removed 
post Spark 2.4 if
-        // new path is stable.
-        if (remoteReadNioBufferConversion) {
-          return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
-        } else {
-          return Some(ChunkedByteBuffer.fromManagedBuffer(data))
-        }
+        // If the ManagedBuffer is a BlockManagerManagedBuffer, the disposal 
of the
+        // byte buffers backing it may need to be handled after reading the 
bytes.
+        // In this case, since we just fetched the bytes remotely, we do not 
have
+        // a BlockManagerManagedBuffer. The assert here is to ensure that this 
holds
+        // true (or the disposal is handled).
+        assert(!data.isInstanceOf[BlockManagerManagedBuffer])
+        return Some(data)
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -804,6 +799,22 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get block from remote block managers as serialized bytes.
+   */
+  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+    getRemoteManagedBuffer(blockId).map { data =>
+      // SPARK-24307 undocumented "escape-hatch" in case there are any issues 
in converting to
+      // ChunkedByteBuffer, to go back to old code-path.  Can be removed post 
Spark 2.4 if
+      // new path is stable.
+      if (remoteReadNioBufferConversion) {
+        new ChunkedByteBuffer(data.nioByteBuffer())
+      } else {
+        ChunkedByteBuffer.fromManagedBuffer(data)
+      }
+    }
+  }
+
+  /**
    * Get a block from the block manager (either local or remote).
    *
    * This acquires a read lock on the block if the block was stored locally 
and does not acquire

http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 128d6ff..2c3730d 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -172,8 +172,6 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 private[spark] object ChunkedByteBuffer {
 
-
-  // TODO SPARK-25905 eliminate this method if we switch BlockManager to 
getting InputStreams
   def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
     data match {
       case f: FileSegmentManagedBuffer =>

http://git-wip-us.apache.org/repos/asf/spark/blob/59741887/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 629a323..4083b20 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -195,7 +195,7 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
         new 
ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList
       assert(deserialized === (1 to 100).toList)
     }
-    // This will exercise the getRemoteBytes / getRemoteValues code paths:
+    // This will exercise the getRemoteValues code path:
     assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet 
=== (1 to 1000).toSet)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to