Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r210153952 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -406,6 +407,61 @@ private[spark] class BlockManager( putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } + override def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID = { + // TODO if we're going to only put the data in the disk store, we should just write it directly + // to the final location, but that would require a deeper refactor of this code. So instead + // we just write to a temp file, and call putBytes on the data in that file. + val tmpFile = diskBlockManager.createTempLocalBlock()._2 + val channel = new CountingWritableChannel( + Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) + logTrace(s"Streaming block $blockId to tmp file $tmpFile") + new StreamCallbackWithID { + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + logTrace(s"Done receiving block $blockId, now putting into local blockManager") + // Read the contents of the downloaded file as a buffer to put into the blockManager. + // Note this is all happening inside the netty thread as soon as it reads the end of the + // stream. + channel.close() + // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up + // using a lot of memory here. We won't get a jvm OOM, but might get killed by the + // OS / cluster manager. We could at least read the tmp file as a stream. + val buffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // we need to pass in the size of the unencrypted block + val blockSize = channel.getCount + val allocator = level.memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) --- End diff -- yeah, you store the entire file in memory (after decrypting). its not memory mapped either, so it'll probably be a regular OOM (depending on memory mode). updated the comment
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org