Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r207968882 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -404,6 +405,47 @@ private[spark] class BlockManager( putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } + override def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID = { + // TODO if we're going to only put the data in the disk store, we should just write it directly + // to the final location, but that would require a deeper refactor of this code. So instead + // we just write to a temp file, and call putBytes on the data in that file. + val tmpFile = diskBlockManager.createTempLocalBlock()._2 + new StreamCallbackWithID { + val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile)) + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + // Read the contents of the downloaded file as a buffer to put into the blockManager. + // Note this is all happening inside the netty thread as soon as it reads the end of the + // stream. + channel.close() + // TODO Even if we're only going to write the data to disk after this, we end up using a lot + // of memory here. We wont' get a jvm OOM, but might get killed by the OS / cluster --- End diff -- filed SPARK-25035
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org