Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r210153952
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
    @@ -406,6 +407,61 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), 
level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we 
should just write it directly
    +    // to the final location, but that would require a deeper refactor of 
this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that 
file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    val channel = new CountingWritableChannel(
    +      Channels.newChannel(serializerManager.wrapForEncryption(new 
FileOutputStream(tmpFile))))
    +    logTrace(s"Streaming block $blockId to tmp file $tmpFile")
    +    new StreamCallbackWithID {
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        logTrace(s"Done receiving block $blockId, now putting into local 
blockManager")
    +        // Read the contents of the downloaded file as a buffer to put 
into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as 
it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO SPARK-25035 Even if we're only going to write the data to 
disk after this, we end up
    +        // using a lot of memory here.  We won't get a jvm OOM, but might 
get killed by the
    +        // OS / cluster manager.  We could at least read the tmp file as a 
stream.
    +        val buffer = securityManager.getIOEncryptionKey() match {
    +          case Some(key) =>
    +            // we need to pass in the size of the unencrypted block
    +            val blockSize = channel.getCount
    +            val allocator = level.memoryMode match {
    +              case MemoryMode.ON_HEAP => ByteBuffer.allocate _
    +              case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
    +            }
    +            new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
    --- End diff --
    
    yeah, you store the entire file in memory (after decrypting).  its not 
memory mapped either, so it'll probably be a regular OOM (depending on memory 
mode).  updated the comment


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to