Repository: spark
Updated Branches:
  refs/heads/branch-2.4 11e07812c -> 881a60403


[SPARK-25827][CORE] Avoid converting incoming encrypted blocks to byte buffers

## What changes were proposed in this pull request?

Avoid converting encrypted bocks to regular ByteBuffers, to ensure they can be 
sent over the network for replication & remote reads even when > 2GB.

Also updates some TODOs with links to a SPARK-25905 for improving the
handling here.

## How was this patch tested?

Tested on a cluster with encrypted data > 2GB (after SPARK-25904 was
applied as well).

Closes #22917 from squito/real_SPARK-25827.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
(cherry picked from commit 7ea594e7876258296f340daddefcaf71a64ab824)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881a6040
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881a6040
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881a6040

Branch: refs/heads/branch-2.4
Commit: 881a60403ffcc178e9177726470fe07ece0cb6f8
Parents: 11e0781
Author: Imran Rashid <iras...@cloudera.com>
Authored: Fri Nov 2 13:24:55 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Fri Nov 2 13:25:07 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/network/BlockTransferService.scala  | 4 +++-
 .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +-
 core/src/main/scala/org/apache/spark/storage/DiskStore.scala   | 5 +++--
 .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala     | 6 ++++--
 4 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/881a6040/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index eef8c31..a58c8fa 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager, ShuffleClient}
-import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel}
 import org.apache.spark.util.ThreadUtils
 
 private[spark]
@@ -104,6 +104,8 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
           data match {
             case f: FileSegmentManagedBuffer =>
               result.success(f)
+            case e: EncryptedManagedBuffer =>
+              result.success(e)
             case _ =>
               val ret = ByteBuffer.allocate(data.size.toInt)
               ret.put(data.nioByteBuffer())

http://git-wip-us.apache.org/repos/asf/spark/blob/881a6040/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c01a453..e35dd72 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -721,7 +721,7 @@ private[spark] class BlockManager(
    * Get block from remote block managers as serialized bytes.
    */
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
-    // TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+    // TODO SPARK-25905 if we change this method to return the ManagedBuffer, 
then getRemoteValues
     // could just use the inputStream on the temp file, rather than reading 
the file into memory.
     // Until then, replication can cause the process to use too much memory 
and get killed
     // even though we've read the data to disk.

http://git-wip-us.apache.org/repos/asf/spark/blob/881a6040/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index d88bd71..841e16a 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -201,7 +201,7 @@ private class DiskBlockData(
   private def open() = new FileInputStream(file).getChannel
 }
 
-private class EncryptedBlockData(
+private[spark] class EncryptedBlockData(
     file: File,
     blockSize: Long,
     conf: SparkConf,
@@ -263,7 +263,8 @@ private class EncryptedBlockData(
   }
 }
 
-private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) 
extends ManagedBuffer {
+private[spark] class EncryptedManagedBuffer(
+    val blockData: EncryptedBlockData) extends ManagedBuffer {
 
   // This is the size of the decrypted data
   override def size(): Long = blockData.size

http://git-wip-us.apache.org/repos/asf/spark/blob/881a6040/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 9547cb4..da2be84 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -29,7 +29,7 @@ import org.apache.spark.SparkEnv
 import org.apache.spark.internal.config
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.util.{ByteArrayWritableChannel, 
LimitedInputStream}
-import org.apache.spark.storage.StorageUtils
+import org.apache.spark.storage.{EncryptedManagedBuffer, StorageUtils}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 
@@ -173,11 +173,13 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 private[spark] object ChunkedByteBuffer {
 
 
-  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  // TODO SPARK-25905 eliminate this method if we switch BlockManager to 
getting InputStreams
   def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
     data match {
       case f: FileSegmentManagedBuffer =>
         fromFile(f.getFile, f.getOffset, f.getLength)
+      case e: EncryptedManagedBuffer =>
+        e.blockData.toChunkedByteBuffer(ByteBuffer.allocate _)
       case other =>
         new ChunkedByteBuffer(other.nioByteBuffer())
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to