Repository: spark
Updated Branches:
  refs/heads/branch-2.0 72a0ee3ab -> b45940e67


[SPARK-17204][CORE] Fix replicated off heap storage

(Jira: https://issues.apache.org/jira/browse/SPARK-17204)

There are a couple of bugs in the `BlockManager` with respect to support for 
replicated off-heap storage. First, the locally-stored off-heap byte buffer is 
disposed of when it is replicated. It should not be. Second, the replica byte 
buffers are stored as heap byte buffers instead of direct byte buffers even 
when the storage level memory mode is off-heap. This PR addresses both of these 
problems.

`BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It 
now fails if either of the bugs in this PR exist.

Author: Michael Allman <mich...@videoamp.com>

Closes #17390 from mallman/spark-17204-replicated_off_heap_storage-2.0_backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b45940e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b45940e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b45940e6

Branch: refs/heads/branch-2.0
Commit: b45940e676e193b2095ea0199a471427700b0d7f
Parents: 72a0ee3
Author: Michael Allman <mich...@videoamp.com>
Authored: Fri Mar 24 12:52:10 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Mar 24 12:52:10 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 21 +++++++-
 .../org/apache/spark/storage/StorageUtils.scala | 52 +++++++++++++++++---
 .../spark/util/ByteBufferInputStream.scala      |  8 +--
 .../spark/util/io/ChunkedByteBuffer.scala       | 27 ++++++++--
 .../storage/BlockManagerReplicationSuite.scala  | 20 ++++++--
 5 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b45940e6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 37dfbd6..f9e48b2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -291,6 +291,9 @@ private[spark] class BlockManager(
 
   /**
    * Put the block locally, using the given storage level.
+   *
+   * '''Important!''' Callers must not mutate or release the data buffer 
underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
    */
   override def putBlockData(
       blockId: BlockId,
@@ -731,6 +734,9 @@ private[spark] class BlockManager(
   /**
    * Put a new block of serialized bytes to the block manager.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer 
underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @return true if the block was stored or false if an error occurred.
    */
   def putBytes[T: ClassTag](
@@ -748,6 +754,9 @@ private[spark] class BlockManager(
    *
    * If the block already exists, this method will not overwrite it.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer 
underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @param keepReadLock if true, this method will hold the read lock when it 
returns (even if the
    *                     block already exists). If false, this method will 
hold no locks when it
    *                     returns.
@@ -791,7 +800,15 @@ private[spark] class BlockManager(
               false
           }
         } else {
-          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
+          val memoryMode = level.memoryMode
+          memoryStore.putBytes(blockId, size, memoryMode, () => {
+            if (memoryMode == MemoryMode.OFF_HEAP &&
+                bytes.chunks.exists(buffer => !buffer.isDirect)) {
+              bytes.copy(Platform.allocateDirectBuffer)
+            } else {
+              bytes
+            }
+          })
         }
         if (!putSucceeded && level.useDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
@@ -996,7 +1013,7 @@ private[spark] class BlockManager(
           try {
             replicate(blockId, bytesToReplicate, level, remoteClassTag)
           } finally {
-            bytesToReplicate.dispose()
+            bytesToReplicate.unmap()
           }
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))

http://git-wip-us.apache.org/repos/asf/spark/blob/b45940e6/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index fb9941b..d160a25 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -231,22 +231,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, 
val maxMem: Long) {
 
 /** Helper methods for storage-related objects. */
 private[spark] object StorageUtils extends Logging {
+  // Ewwww... Reflection!!! See the unmap method for justification
+  private val memoryMappedBufferFileDescriptorField = {
+    val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
+    val fdField = mappedBufferClass.getDeclaredField("fd")
+    fdField.setAccessible(true)
+    fdField
+  }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an 
*unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but 
it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of 
open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This 
uses an *unsafe* Sun
+   * API that will cause errors if one attempts to read from the disposed 
buffer. However, neither
+   * the bytes allocated to direct buffers nor file descriptors opened for 
memory-mapped buffers put
+   * pressure on the garbage collector. Waiting for garbage collection may 
lead to the depletion of
+   * off-heap memory or huge numbers of open files. There's unfortunately no 
standard API to
+   * manually dispose of these kinds of buffers.
+   *
+   * See also [[unmap]]
    */
   def dispose(buffer: ByteBuffer): Unit = {
     if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace(s"Unmapping $buffer")
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+      logTrace(s"Disposing of $buffer")
+      cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
+    }
+  }
+
+  /**
+   * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an 
*unsafe* Sun API that will
+   * cause errors if one attempts to read from the unmapped buffer. However, 
the file descriptors of
+   * memory-mapped buffers do not put pressure on the garbage collector. 
Waiting for garbage
+   * collection may lead to huge numbers of open files. There's unfortunately 
no standard API to
+   * manually unmap memory-mapped buffers.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(buffer: ByteBuffer): Unit = {
+    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+      // Note that direct buffers are instances of MappedByteBuffer. As things 
stand in Java 8, the
+      // JDK does not provide a public API to distinguish between direct 
buffers and memory-mapped
+      // buffers. As an alternative, we peek beneath the curtains and look for 
a non-null file
+      // descriptor in mappedByteBuffer
+      if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
+        logTrace(s"Unmapping $buffer")
+        cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
       }
     }
   }
 
+  private def cleanDirectBuffer(buffer: DirectBuffer) = {
+    val cleaner = buffer.cleaner()
+    if (cleaner != null) {
+      cleaner.clean()
+    }
+  }
+
   /**
    * Update the given list of RDDInfo with the given list of storage statuses.
    * This method overwrites the old values stored in the RDDInfo's.

http://git-wip-us.apache.org/repos/asf/spark/blob/b45940e6/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index dce2ac6..50dc948 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -23,11 +23,10 @@ import java.nio.ByteBuffer
 import org.apache.spark.storage.StorageUtils
 
 /**
- * Reads data from a ByteBuffer, and optionally cleans it up using 
StorageUtils.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
+ * Reads data from a ByteBuffer.
  */
 private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = 
false)
+class ByteBufferInputStream(private var buffer: ByteBuffer)
   extends InputStream {
 
   override def read(): Int = {
@@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, 
dispose: Boolean = f
    */
   private def cleanUp() {
     if (buffer != null) {
-      if (dispose) {
-        StorageUtils.dispose(buffer)
-      }
       buffer = null
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b45940e6/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 89b0874..810cc38 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   }
 
   /**
-   * Copy this buffer into a new ByteBuffer.
+   * Convert this buffer to a ByteBuffer. If this buffer is backed by a single 
chunk, its underlying
+   * data will not be copied. Instead, it will be duplicated. If this buffer 
is backed by multiple
+   * chunks, the data underlying this buffer will be copied into a new byte 
buffer. As a result, it
+   * is suggested to use this method only if the caller does not need to 
manage the memory
+   * underlying this buffer.
    *
    * @throws UnsupportedOperationException if this buffer's size exceeds the 
max ByteBuffer size.
    */
@@ -132,10 +136,10 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an 
*unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but 
it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of 
open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is 
direct or memory-mapped.
+   * See [[StorageUtils.dispose]] for more information.
+   *
+   * See also [[unmap]]
    */
   def dispose(): Unit = {
     if (!disposed) {
@@ -143,6 +147,19 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
       disposed = true
     }
   }
+
+  /**
+   * Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is 
memory-mapped. See
+   * [[StorageUtils.unmap]] for more information.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(): Unit = {
+    if (!disposed) {
+      chunks.foreach(StorageUtils.unmap)
+      disposed = true
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b45940e6/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b9e3a36..c294b5d 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -373,7 +373,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite
       // Put the block into one of the stores
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
-      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      val testValue = Array.fill[Byte](blockSize)(1)
+      stores(0).putSingle(blockId, testValue, storageLevel)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -385,12 +386,23 @@ class BlockManagerReplicationSuite extends SparkFunSuite
         testStore => 
blockLocations.contains(testStore.blockManagerId.executorId)
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
-        assert(
-          testStore.getLocalValues(blockId).isDefined, s"$blockId was not 
found in $testStoreName")
-        testStore.releaseLock(blockId)
+        val blockResultOpt = testStore.getLocalValues(blockId)
+        assert(blockResultOpt.isDefined, s"$blockId was not found in 
$testStoreName")
+        val localValues = blockResultOpt.get.data.toSeq
+        assert(localValues.size == 1)
+        assert(localValues.head === testValue)
         
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
 
+        val memoryStore = testStore.memoryStore
+        if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
+          memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
+            assert(storageLevel.useOffHeap == byteBuffer.isDirect,
+              s"memory mode ${storageLevel.memoryMode} is not compatible with 
" +
+                byteBuffer.getClass.getSimpleName)
+          }
+        }
+
         val blockStatus = 
master.getBlockStatus(blockId)(testStore.blockManagerId)
 
         // Assert that block status in the master for this store has expected 
storage level


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to